001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNotSame; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027 028import java.io.IOException; 029import java.lang.reflect.Field; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Map.Entry; 036import java.util.Random; 037import java.util.Set; 038import java.util.concurrent.Callable; 039import java.util.stream.Collectors; 040import java.util.stream.Stream; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FileStatus; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.LocatedFileStatus; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.fs.RemoteIterator; 047import org.apache.hadoop.hbase.ArrayBackedTag; 048import org.apache.hadoop.hbase.Cell; 049import org.apache.hadoop.hbase.CellUtil; 050import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 051import org.apache.hadoop.hbase.HBaseClassTestRule; 052import org.apache.hadoop.hbase.HBaseConfiguration; 053import org.apache.hadoop.hbase.HBaseTestingUtility; 054import org.apache.hadoop.hbase.HColumnDescriptor; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.HDFSBlocksDistribution; 057import org.apache.hadoop.hbase.HTableDescriptor; 058import org.apache.hadoop.hbase.HadoopShims; 059import org.apache.hadoop.hbase.KeyValue; 060import org.apache.hadoop.hbase.PerformanceEvaluation; 061import org.apache.hadoop.hbase.TableName; 062import org.apache.hadoop.hbase.Tag; 063import org.apache.hadoop.hbase.TagType; 064import org.apache.hadoop.hbase.TagUtil; 065import org.apache.hadoop.hbase.client.Admin; 066import org.apache.hadoop.hbase.client.Connection; 067import org.apache.hadoop.hbase.client.ConnectionFactory; 068import org.apache.hadoop.hbase.client.Put; 069import org.apache.hadoop.hbase.client.RegionLocator; 070import org.apache.hadoop.hbase.client.Result; 071import org.apache.hadoop.hbase.client.ResultScanner; 072import org.apache.hadoop.hbase.client.Scan; 073import org.apache.hadoop.hbase.client.Table; 074import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 075import org.apache.hadoop.hbase.io.compress.Compression; 076import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 077import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 078import org.apache.hadoop.hbase.io.hfile.CacheConfig; 079import org.apache.hadoop.hbase.io.hfile.HFile; 080import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 081import org.apache.hadoop.hbase.io.hfile.HFileScanner; 082import org.apache.hadoop.hbase.regionserver.BloomType; 083import org.apache.hadoop.hbase.regionserver.HRegion; 084import org.apache.hadoop.hbase.regionserver.HStore; 085import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem; 086import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 087import org.apache.hadoop.hbase.testclassification.LargeTests; 088import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 089import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 090import org.apache.hadoop.hbase.util.Bytes; 091import org.apache.hadoop.hbase.util.CommonFSUtils; 092import org.apache.hadoop.hbase.util.FSUtils; 093import org.apache.hadoop.hbase.util.ReflectionUtils; 094import org.apache.hadoop.hdfs.DistributedFileSystem; 095import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 096import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 097import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 098import org.apache.hadoop.io.NullWritable; 099import org.apache.hadoop.mapreduce.Job; 100import org.apache.hadoop.mapreduce.Mapper; 101import org.apache.hadoop.mapreduce.RecordWriter; 102import org.apache.hadoop.mapreduce.TaskAttemptContext; 103import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 104import org.junit.ClassRule; 105import org.junit.Ignore; 106import org.junit.Test; 107import org.junit.experimental.categories.Category; 108import org.mockito.Mockito; 109import org.slf4j.Logger; 110import org.slf4j.LoggerFactory; 111 112/** 113 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile 114 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and 115 * values like those of {@link PerformanceEvaluation}. 116 */ 117@Category({ VerySlowMapReduceTests.class, LargeTests.class }) 118public class TestCellBasedHFileOutputFormat2 { 119 120 @ClassRule 121 public static final HBaseClassTestRule CLASS_RULE = 122 HBaseClassTestRule.forClass(TestCellBasedHFileOutputFormat2.class); 123 124 private final static int ROWSPERSPLIT = 1024; 125 126 public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME; 127 private static final byte[][] FAMILIES = 128 { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B")) }; 129 private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", "TestTable3") 130 .map(TableName::valueOf).toArray(TableName[]::new); 131 132 private HBaseTestingUtility util = new HBaseTestingUtility(); 133 134 private static final Logger LOG = LoggerFactory.getLogger(TestCellBasedHFileOutputFormat2.class); 135 136 /** 137 * Simple mapper that makes KeyValue output. 138 */ 139 static class RandomKVGeneratingMapper 140 extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> { 141 142 private int keyLength; 143 private static final int KEYLEN_DEFAULT = 10; 144 private static final String KEYLEN_CONF = "randomkv.key.length"; 145 146 private int valLength; 147 private static final int VALLEN_DEFAULT = 10; 148 private static final String VALLEN_CONF = "randomkv.val.length"; 149 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 150 private boolean multiTableMapper = false; 151 private TableName[] tables = null; 152 153 @Override 154 protected void setup(Context context) throws IOException, InterruptedException { 155 super.setup(context); 156 157 Configuration conf = context.getConfiguration(); 158 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 159 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 160 multiTableMapper = 161 conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 162 if (multiTableMapper) { 163 tables = TABLE_NAMES; 164 } else { 165 tables = new TableName[] { TABLE_NAMES[0] }; 166 } 167 } 168 169 @Override 170 protected void map(NullWritable n1, NullWritable n2, 171 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context) 172 throws java.io.IOException, InterruptedException { 173 174 byte keyBytes[] = new byte[keyLength]; 175 byte valBytes[] = new byte[valLength]; 176 177 int taskId = context.getTaskAttemptID().getTaskID().getId(); 178 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 179 Random random = new Random(); 180 byte[] key; 181 for (int j = 0; j < tables.length; ++j) { 182 for (int i = 0; i < ROWSPERSPLIT; i++) { 183 random.nextBytes(keyBytes); 184 // Ensure that unique tasks generate unique keys 185 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 186 random.nextBytes(valBytes); 187 key = keyBytes; 188 if (multiTableMapper) { 189 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 190 } 191 192 for (byte[] family : TestCellBasedHFileOutputFormat2.FAMILIES) { 193 Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); 194 context.write(new ImmutableBytesWritable(key), kv); 195 } 196 } 197 } 198 } 199 } 200 201 /** 202 * Simple mapper that makes Put output. 203 */ 204 static class RandomPutGeneratingMapper 205 extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> { 206 207 private int keyLength; 208 private static final int KEYLEN_DEFAULT = 10; 209 private static final String KEYLEN_CONF = "randomkv.key.length"; 210 211 private int valLength; 212 private static final int VALLEN_DEFAULT = 10; 213 private static final String VALLEN_CONF = "randomkv.val.length"; 214 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 215 private boolean multiTableMapper = false; 216 private TableName[] tables = null; 217 218 @Override 219 protected void setup(Context context) throws IOException, InterruptedException { 220 super.setup(context); 221 222 Configuration conf = context.getConfiguration(); 223 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 224 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 225 multiTableMapper = 226 conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 227 if (multiTableMapper) { 228 tables = TABLE_NAMES; 229 } else { 230 tables = new TableName[] { TABLE_NAMES[0] }; 231 } 232 } 233 234 @Override 235 protected void map(NullWritable n1, NullWritable n2, 236 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context context) 237 throws java.io.IOException, InterruptedException { 238 239 byte keyBytes[] = new byte[keyLength]; 240 byte valBytes[] = new byte[valLength]; 241 242 int taskId = context.getTaskAttemptID().getTaskID().getId(); 243 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 244 245 Random random = new Random(); 246 byte[] key; 247 for (int j = 0; j < tables.length; ++j) { 248 for (int i = 0; i < ROWSPERSPLIT; i++) { 249 random.nextBytes(keyBytes); 250 // Ensure that unique tasks generate unique keys 251 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 252 random.nextBytes(valBytes); 253 key = keyBytes; 254 if (multiTableMapper) { 255 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 256 } 257 258 for (byte[] family : TestCellBasedHFileOutputFormat2.FAMILIES) { 259 Put p = new Put(keyBytes); 260 p.addColumn(family, QUALIFIER, valBytes); 261 // set TTL to very low so that the scan does not return any value 262 p.setTTL(1l); 263 context.write(new ImmutableBytesWritable(key), p); 264 } 265 } 266 } 267 } 268 } 269 270 private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) { 271 if (putSortReducer) { 272 job.setInputFormatClass(NMapInputFormat.class); 273 job.setMapperClass(RandomPutGeneratingMapper.class); 274 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 275 job.setMapOutputValueClass(Put.class); 276 } else { 277 job.setInputFormatClass(NMapInputFormat.class); 278 job.setMapperClass(RandomKVGeneratingMapper.class); 279 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 280 job.setMapOutputValueClass(KeyValue.class); 281 } 282 } 283 284 /** 285 * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose 286 * timestamp is {@link HConstants#LATEST_TIMESTAMP}. 287 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a> 288 */ 289 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 290 @Test 291 public void test_LATEST_TIMESTAMP_isReplaced() throws Exception { 292 Configuration conf = new Configuration(this.util.getConfiguration()); 293 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 294 TaskAttemptContext context = null; 295 Path dir = util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced"); 296 try { 297 Job job = new Job(conf); 298 FileOutputFormat.setOutputPath(job, dir); 299 context = createTestTaskAttemptContext(job); 300 HFileOutputFormat2 hof = new HFileOutputFormat2(); 301 writer = hof.getRecordWriter(context); 302 final byte[] b = Bytes.toBytes("b"); 303 304 // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be 305 // changed by call to write. Check all in kv is same but ts. 306 KeyValue kv = new KeyValue(b, b, b); 307 KeyValue original = kv.clone(); 308 writer.write(new ImmutableBytesWritable(), kv); 309 assertFalse(original.equals(kv)); 310 assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv))); 311 assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv))); 312 assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv))); 313 assertNotSame(original.getTimestamp(), kv.getTimestamp()); 314 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp()); 315 316 // Test 2. Now test passing a kv that has explicit ts. It should not be 317 // changed by call to record write. 318 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b); 319 original = kv.clone(); 320 writer.write(new ImmutableBytesWritable(), kv); 321 assertTrue(original.equals(kv)); 322 } finally { 323 if (writer != null && context != null) writer.close(context); 324 dir.getFileSystem(conf).delete(dir, true); 325 } 326 } 327 328 private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception { 329 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class); 330 TaskAttemptContext context = 331 hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0"); 332 return context; 333 } 334 335 /* 336 * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by 337 * time-restricted scans. 338 */ 339 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 340 @Test 341 public void test_TIMERANGE() throws Exception { 342 Configuration conf = new Configuration(this.util.getConfiguration()); 343 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 344 TaskAttemptContext context = null; 345 Path dir = util.getDataTestDir("test_TIMERANGE_present"); 346 LOG.info("Timerange dir writing to dir: " + dir); 347 try { 348 // build a record writer using HFileOutputFormat2 349 Job job = new Job(conf); 350 FileOutputFormat.setOutputPath(job, dir); 351 context = createTestTaskAttemptContext(job); 352 HFileOutputFormat2 hof = new HFileOutputFormat2(); 353 writer = hof.getRecordWriter(context); 354 355 // Pass two key values with explicit times stamps 356 final byte[] b = Bytes.toBytes("b"); 357 358 // value 1 with timestamp 2000 359 KeyValue kv = new KeyValue(b, b, b, 2000, b); 360 KeyValue original = kv.clone(); 361 writer.write(new ImmutableBytesWritable(), kv); 362 assertEquals(original, kv); 363 364 // value 2 with timestamp 1000 365 kv = new KeyValue(b, b, b, 1000, b); 366 original = kv.clone(); 367 writer.write(new ImmutableBytesWritable(), kv); 368 assertEquals(original, kv); 369 370 // verify that the file has the proper FileInfo. 371 writer.close(context); 372 373 // the generated file lives 1 directory down from the attempt directory 374 // and is the only file, e.g. 375 // _attempt__0000_r_000000_0/b/1979617994050536795 376 FileSystem fs = FileSystem.get(conf); 377 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent(); 378 FileStatus[] sub1 = fs.listStatus(attemptDirectory); 379 FileStatus[] file = fs.listStatus(sub1[0].getPath()); 380 381 // open as HFile Reader and pull out TIMERANGE FileInfo. 382 HFile.Reader rd = 383 HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf); 384 Map<byte[], byte[]> finfo = rd.getHFileInfo(); 385 byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8")); 386 assertNotNull(range); 387 388 // unmarshall and check values. 389 TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range); 390 LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax()); 391 assertEquals(1000, timeRangeTracker.getMin()); 392 assertEquals(2000, timeRangeTracker.getMax()); 393 rd.close(); 394 } finally { 395 if (writer != null && context != null) writer.close(context); 396 dir.getFileSystem(conf).delete(dir, true); 397 } 398 } 399 400 /** 401 * Run small MR job. 402 */ 403 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 404 @Test 405 public void testWritingPEData() throws Exception { 406 Configuration conf = util.getConfiguration(); 407 Path testDir = util.getDataTestDirOnTestFS("testWritingPEData"); 408 FileSystem fs = testDir.getFileSystem(conf); 409 410 // Set down this value or we OOME in eclipse. 411 conf.setInt("mapreduce.task.io.sort.mb", 20); 412 // Write a few files. 413 conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); 414 415 Job job = new Job(conf, "testWritingPEData"); 416 setupRandomGeneratorMapper(job, false); 417 // This partitioner doesn't work well for number keys but using it anyways 418 // just to demonstrate how to configure it. 419 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 420 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 421 422 Arrays.fill(startKey, (byte) 0); 423 Arrays.fill(endKey, (byte) 0xff); 424 425 job.setPartitionerClass(SimpleTotalOrderPartitioner.class); 426 // Set start and end rows for partitioner. 427 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); 428 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); 429 job.setReducerClass(CellSortReducer.class); 430 job.setOutputFormatClass(HFileOutputFormat2.class); 431 job.setNumReduceTasks(4); 432 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 433 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 434 CellSerialization.class.getName()); 435 436 FileOutputFormat.setOutputPath(job, testDir); 437 assertTrue(job.waitForCompletion(false)); 438 FileStatus[] files = fs.listStatus(testDir); 439 assertTrue(files.length > 0); 440 } 441 442 /** 443 * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile. 444 */ 445 @Test 446 public void test_WritingTagData() throws Exception { 447 Configuration conf = new Configuration(this.util.getConfiguration()); 448 final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version"; 449 conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); 450 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 451 TaskAttemptContext context = null; 452 Path dir = util.getDataTestDir("WritingTagData"); 453 try { 454 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 455 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 456 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 457 Job job = new Job(conf); 458 FileOutputFormat.setOutputPath(job, dir); 459 context = createTestTaskAttemptContext(job); 460 HFileOutputFormat2 hof = new HFileOutputFormat2(); 461 writer = hof.getRecordWriter(context); 462 final byte[] b = Bytes.toBytes("b"); 463 464 List<Tag> tags = new ArrayList<>(); 465 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670))); 466 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags); 467 writer.write(new ImmutableBytesWritable(), kv); 468 writer.close(context); 469 writer = null; 470 FileSystem fs = dir.getFileSystem(conf); 471 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 472 while (iterator.hasNext()) { 473 LocatedFileStatus keyFileStatus = iterator.next(); 474 HFile.Reader reader = 475 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 476 HFileScanner scanner = reader.getScanner(conf, false, false, false); 477 scanner.seekTo(); 478 Cell cell = scanner.getCell(); 479 List<Tag> tagsFromCell = 480 TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); 481 assertTrue(tagsFromCell.size() > 0); 482 for (Tag tag : tagsFromCell) { 483 assertTrue(tag.getType() == TagType.TTL_TAG_TYPE); 484 } 485 } 486 } finally { 487 if (writer != null && context != null) writer.close(context); 488 dir.getFileSystem(conf).delete(dir, true); 489 } 490 } 491 492 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 493 @Test 494 public void testJobConfiguration() throws Exception { 495 Configuration conf = new Configuration(this.util.getConfiguration()); 496 conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 497 util.getDataTestDir("testJobConfiguration").toString()); 498 Job job = new Job(conf); 499 job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); 500 Table table = Mockito.mock(Table.class); 501 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 502 setupMockStartKeys(regionLocator); 503 setupMockTableName(regionLocator); 504 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 505 assertEquals(job.getNumReduceTasks(), 4); 506 } 507 508 private byte[][] generateRandomStartKeys(int numKeys) { 509 Random random = new Random(); 510 byte[][] ret = new byte[numKeys][]; 511 // first region start key is always empty 512 ret[0] = HConstants.EMPTY_BYTE_ARRAY; 513 for (int i = 1; i < numKeys; i++) { 514 ret[i] = 515 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 516 } 517 return ret; 518 } 519 520 private byte[][] generateRandomSplitKeys(int numKeys) { 521 Random random = new Random(); 522 byte[][] ret = new byte[numKeys][]; 523 for (int i = 0; i < numKeys; i++) { 524 ret[i] = 525 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 526 } 527 return ret; 528 } 529 530 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 531 @Test 532 public void testMRIncrementalLoad() throws Exception { 533 LOG.info("\nStarting test testMRIncrementalLoad\n"); 534 doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad"); 535 } 536 537 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 538 @Test 539 public void testMRIncrementalLoadWithSplit() throws Exception { 540 LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n"); 541 doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit"); 542 } 543 544 /** 545 * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the 546 * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because 547 * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to 548 * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster 549 * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region 550 * locality features more easily. 551 */ 552 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 553 @Test 554 public void testMRIncrementalLoadWithLocality() throws Exception { 555 LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n"); 556 doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1"); 557 doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2"); 558 } 559 560 // @Ignore("Wahtevs") 561 @Test 562 public void testMRIncrementalLoadWithPutSortReducer() throws Exception { 563 LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n"); 564 doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer"); 565 } 566 567 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 568 boolean putSortReducer, String tableStr) throws Exception { 569 doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, false, 570 Arrays.asList(tableStr)); 571 } 572 573 @Test 574 public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception { 575 LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n"); 576 doIncrementalLoadTest(false, false, true, false, 577 Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList())); 578 } 579 580 @Test 581 public void testMultiMRIncrementalLoadWithPutSortReducerWithNamespaceInPath() throws Exception { 582 LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducerWithNamespaceInPath\n"); 583 doIncrementalLoadTest(false, false, true, true, 584 Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList())); 585 } 586 587 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 588 boolean putSortReducer, boolean shouldWriteToTableWithNamespace, List<String> tableStr) 589 throws Exception { 590 util = new HBaseTestingUtility(); 591 Configuration conf = util.getConfiguration(); 592 conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); 593 if (shouldWriteToTableWithNamespace) { 594 conf.setBoolean(HFileOutputFormat2.TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, true); 595 } 596 int hostCount = 1; 597 int regionNum = 5; 598 if (shouldKeepLocality) { 599 // We should change host count higher than hdfs replica count when MiniHBaseCluster supports 600 // explicit hostnames parameter just like MiniDFSCluster does. 601 hostCount = 3; 602 regionNum = 20; 603 } 604 605 String[] hostnames = new String[hostCount]; 606 for (int i = 0; i < hostCount; ++i) { 607 hostnames[i] = "datanode_" + i; 608 } 609 util.startMiniCluster(1, hostCount, hostnames); 610 611 Map<String, Table> allTables = new HashMap<>(tableStr.size()); 612 List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size()); 613 boolean writeMultipleTables = tableStr.size() > 1; 614 for (String tableStrSingle : tableStr) { 615 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 616 TableName tableName = TableName.valueOf(tableStrSingle); 617 Table table = util.createTable(tableName, FAMILIES, splitKeys); 618 619 RegionLocator r = util.getConnection().getRegionLocator(tableName); 620 assertEquals("Should start with empty table", 0, util.countRows(table)); 621 int numRegions = r.getStartKeys().length; 622 assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); 623 624 allTables.put(tableStrSingle, table); 625 tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r)); 626 } 627 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 628 // Generate the bulk load files 629 runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer); 630 if (shouldWriteToTableWithNamespace) { 631 testDir = new Path(testDir, "default"); 632 } 633 634 for (Table tableSingle : allTables.values()) { 635 // This doesn't write into the table, just makes files 636 assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle)); 637 } 638 int numTableDirs = 0; 639 FileStatus[] fss = testDir.getFileSystem(conf).listStatus(testDir); 640 for (FileStatus tf : fss) { 641 Path tablePath = testDir; 642 643 if (writeMultipleTables) { 644 if (allTables.containsKey(tf.getPath().getName())) { 645 ++numTableDirs; 646 tablePath = tf.getPath(); 647 } else { 648 continue; 649 } 650 } 651 652 // Make sure that a directory was created for every CF 653 int dir = 0; 654 fss = tablePath.getFileSystem(conf).listStatus(tablePath); 655 for (FileStatus f : fss) { 656 for (byte[] family : FAMILIES) { 657 if (Bytes.toString(family).equals(f.getPath().getName())) { 658 ++dir; 659 } 660 } 661 } 662 assertEquals("Column family not found in FS.", FAMILIES.length, dir); 663 } 664 if (writeMultipleTables) { 665 assertEquals("Dir for all input tables not created", numTableDirs, allTables.size()); 666 } 667 668 Admin admin = util.getConnection().getAdmin(); 669 try { 670 // handle the split case 671 if (shouldChangeRegions) { 672 Table chosenTable = allTables.values().iterator().next(); 673 // Choose a semi-random table if multiple tables are available 674 LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString()); 675 admin.disableTable(chosenTable.getName()); 676 util.waitUntilNoRegionsInTransition(); 677 678 util.deleteTable(chosenTable.getName()); 679 byte[][] newSplitKeys = generateRandomSplitKeys(14); 680 Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys); 681 682 while ( 683 util.getConnection().getRegionLocator(chosenTable.getName()).getAllRegionLocations() 684 .size() != 15 || !admin.isTableAvailable(table.getName()) 685 ) { 686 Thread.sleep(200); 687 LOG.info("Waiting for new region assignment to happen"); 688 } 689 } 690 691 // Perform the actual load 692 for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) { 693 Path tableDir = testDir; 694 String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString(); 695 LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr); 696 if (writeMultipleTables) { 697 tableDir = new Path(testDir, tableNameStr); 698 } 699 Table currentTable = allTables.get(tableNameStr); 700 TableName currentTableName = currentTable.getName(); 701 new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, 702 singleTableInfo.getRegionLocator()); 703 704 // Ensure data shows up 705 int expectedRows = 0; 706 if (putSortReducer) { 707 // no rows should be extracted 708 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 709 util.countRows(currentTable)); 710 } else { 711 expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 712 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 713 util.countRows(currentTable)); 714 Scan scan = new Scan(); 715 ResultScanner results = currentTable.getScanner(scan); 716 for (Result res : results) { 717 assertEquals(FAMILIES.length, res.rawCells().length); 718 Cell first = res.rawCells()[0]; 719 for (Cell kv : res.rawCells()) { 720 assertTrue(CellUtil.matchingRows(first, kv)); 721 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); 722 } 723 } 724 results.close(); 725 } 726 String tableDigestBefore = util.checksumRows(currentTable); 727 // Check region locality 728 HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); 729 for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) { 730 hbd.add(region.getHDFSBlocksDistribution()); 731 } 732 for (String hostname : hostnames) { 733 float locality = hbd.getBlockLocalityIndex(hostname); 734 LOG.info("locality of [" + hostname + "]: " + locality); 735 assertEquals(100, (int) (locality * 100)); 736 } 737 738 // Cause regions to reopen 739 admin.disableTable(currentTableName); 740 while (!admin.isTableDisabled(currentTableName)) { 741 Thread.sleep(200); 742 LOG.info("Waiting for table to disable"); 743 } 744 admin.enableTable(currentTableName); 745 util.waitTableAvailable(currentTableName); 746 assertEquals("Data should remain after reopening of regions", tableDigestBefore, 747 util.checksumRows(currentTable)); 748 } 749 } finally { 750 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 751 tableInfoSingle.getRegionLocator().close(); 752 } 753 for (Entry<String, Table> singleTable : allTables.entrySet()) { 754 singleTable.getValue().close(); 755 util.deleteTable(singleTable.getValue().getName()); 756 } 757 testDir.getFileSystem(conf).delete(testDir, true); 758 util.shutdownMiniCluster(); 759 } 760 } 761 762 private void runIncrementalPELoad(Configuration conf, 763 List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean putSortReducer) 764 throws IOException, InterruptedException, ClassNotFoundException { 765 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 766 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 767 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 768 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 769 CellSerialization.class.getName()); 770 setupRandomGeneratorMapper(job, putSortReducer); 771 if (tableInfo.size() > 1) { 772 MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo); 773 int sum = 0; 774 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 775 sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size(); 776 } 777 assertEquals(sum, job.getNumReduceTasks()); 778 } else { 779 RegionLocator regionLocator = tableInfo.get(0).getRegionLocator(); 780 HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(), 781 regionLocator); 782 assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); 783 } 784 785 FileOutputFormat.setOutputPath(job, outDir); 786 787 assertFalse(util.getTestFileSystem().exists(outDir)); 788 789 assertTrue(job.waitForCompletion(true)); 790 } 791 792 /** 793 * Test for {@link HFileOutputFormat2#configureCompression(Configuration, HTableDescriptor)} and 794 * {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the 795 * compression map is correctly serialized into and deserialized from configuration 796 */ 797 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 798 @Test 799 public void testSerializeDeserializeFamilyCompressionMap() throws IOException { 800 for (int numCfs = 0; numCfs <= 3; numCfs++) { 801 Configuration conf = new Configuration(this.util.getConfiguration()); 802 Map<String, Compression.Algorithm> familyToCompression = 803 getMockColumnFamiliesForCompression(numCfs); 804 Table table = Mockito.mock(Table.class); 805 setupMockColumnFamiliesForCompression(table, familyToCompression); 806 conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY, 807 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails, 808 Arrays.asList(table.getTableDescriptor()))); 809 810 // read back family specific compression setting from the configuration 811 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = 812 HFileOutputFormat2.createFamilyCompressionMap(conf); 813 814 // test that we have a value for all column families that matches with the 815 // used mock values 816 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) { 817 assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), 818 entry.getValue(), retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8"))); 819 } 820 } 821 } 822 823 private void setupMockColumnFamiliesForCompression(Table table, 824 Map<String, Compression.Algorithm> familyToCompression) throws IOException { 825 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 826 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) { 827 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1) 828 .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)); 829 } 830 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 831 } 832 833 /** 834 * @return a map from column family names to compression algorithms for testing column family 835 * compression. Column family names have special characters 836 */ 837 private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) { 838 Map<String, Compression.Algorithm> familyToCompression = new HashMap<>(); 839 // use column family names having special characters 840 if (numCfs-- > 0) { 841 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO); 842 } 843 if (numCfs-- > 0) { 844 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY); 845 } 846 if (numCfs-- > 0) { 847 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ); 848 } 849 if (numCfs-- > 0) { 850 familyToCompression.put("Family3", Compression.Algorithm.NONE); 851 } 852 return familyToCompression; 853 } 854 855 /** 856 * Test for {@link HFileOutputFormat2#configureBloomType(HTableDescriptor, Configuration)} and 857 * {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the compression 858 * map is correctly serialized into and deserialized from configuration 859 */ 860 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 861 @Test 862 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException { 863 for (int numCfs = 0; numCfs <= 2; numCfs++) { 864 Configuration conf = new Configuration(this.util.getConfiguration()); 865 Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs); 866 Table table = Mockito.mock(Table.class); 867 setupMockColumnFamiliesForBloomType(table, familyToBloomType); 868 conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY, 869 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails, 870 Arrays.asList(table.getTableDescriptor()))); 871 872 // read back family specific data block encoding settings from the 873 // configuration 874 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap = 875 HFileOutputFormat2.createFamilyBloomTypeMap(conf); 876 877 // test that we have a value for all column families that matches with the 878 // used mock values 879 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) { 880 assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(), 881 entry.getValue(), retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8"))); 882 } 883 } 884 } 885 886 private void setupMockColumnFamiliesForBloomType(Table table, 887 Map<String, BloomType> familyToDataBlockEncoding) throws IOException { 888 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 889 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) { 890 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1) 891 .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)); 892 } 893 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 894 } 895 896 /** 897 * @return a map from column family names to compression algorithms for testing column family 898 * compression. Column family names have special characters 899 */ 900 private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) { 901 Map<String, BloomType> familyToBloomType = new HashMap<>(); 902 // use column family names having special characters 903 if (numCfs-- > 0) { 904 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW); 905 } 906 if (numCfs-- > 0) { 907 familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL); 908 } 909 if (numCfs-- > 0) { 910 familyToBloomType.put("Family3", BloomType.NONE); 911 } 912 return familyToBloomType; 913 } 914 915 /** 916 * Test for {@link HFileOutputFormat2#configureBlockSize(HTableDescriptor, Configuration)} and 917 * {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the compression 918 * map is correctly serialized into and deserialized from configuration 919 */ 920 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 921 @Test 922 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException { 923 for (int numCfs = 0; numCfs <= 3; numCfs++) { 924 Configuration conf = new Configuration(this.util.getConfiguration()); 925 Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs); 926 Table table = Mockito.mock(Table.class); 927 setupMockColumnFamiliesForBlockSize(table, familyToBlockSize); 928 conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY, 929 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails, 930 Arrays.asList(table.getTableDescriptor()))); 931 932 // read back family specific data block encoding settings from the 933 // configuration 934 Map<byte[], Integer> retrievedFamilyToBlockSizeMap = 935 HFileOutputFormat2.createFamilyBlockSizeMap(conf); 936 937 // test that we have a value for all column families that matches with the 938 // used mock values 939 for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) { 940 assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(), 941 entry.getValue(), retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8"))); 942 } 943 } 944 } 945 946 private void setupMockColumnFamiliesForBlockSize(Table table, 947 Map<String, Integer> familyToDataBlockEncoding) throws IOException { 948 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 949 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) { 950 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1) 951 .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)); 952 } 953 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 954 } 955 956 /** 957 * @return a map from column family names to compression algorithms for testing column family 958 * compression. Column family names have special characters 959 */ 960 private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) { 961 Map<String, Integer> familyToBlockSize = new HashMap<>(); 962 // use column family names having special characters 963 if (numCfs-- > 0) { 964 familyToBlockSize.put("Family1!@#!@#&", 1234); 965 } 966 if (numCfs-- > 0) { 967 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 968 } 969 if (numCfs-- > 0) { 970 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 971 } 972 if (numCfs-- > 0) { 973 familyToBlockSize.put("Family3", 0); 974 } 975 return familyToBlockSize; 976 } 977 978 /** 979 * Test for {@link HFileOutputFormat2#configureDataBlockEncoding(HTableDescriptor, Configuration)} 980 * and {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that the 981 * compression map is correctly serialized into and deserialized from configuration 982 */ 983 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 984 @Test 985 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException { 986 for (int numCfs = 0; numCfs <= 3; numCfs++) { 987 Configuration conf = new Configuration(this.util.getConfiguration()); 988 Map<String, DataBlockEncoding> familyToDataBlockEncoding = 989 getMockColumnFamiliesForDataBlockEncoding(numCfs); 990 Table table = Mockito.mock(Table.class); 991 setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding); 992 HTableDescriptor tableDescriptor = table.getTableDescriptor(); 993 conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 994 HFileOutputFormat2.serializeColumnFamilyAttribute( 995 HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor))); 996 997 // read back family specific data block encoding settings from the 998 // configuration 999 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap = 1000 HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf); 1001 1002 // test that we have a value for all column families that matches with the 1003 // used mock values 1004 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1005 assertEquals( 1006 "DataBlockEncoding configuration incorrect for column family:" + entry.getKey(), 1007 entry.getValue(), 1008 retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8"))); 1009 } 1010 } 1011 } 1012 1013 private void setupMockColumnFamiliesForDataBlockEncoding(Table table, 1014 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException { 1015 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 1016 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1017 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1) 1018 .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)); 1019 } 1020 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 1021 } 1022 1023 /** 1024 * @return a map from column family names to compression algorithms for testing column family 1025 * compression. Column family names have special characters 1026 */ 1027 private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) { 1028 Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>(); 1029 // use column family names having special characters 1030 if (numCfs-- > 0) { 1031 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF); 1032 } 1033 if (numCfs-- > 0) { 1034 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF); 1035 } 1036 if (numCfs-- > 0) { 1037 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX); 1038 } 1039 if (numCfs-- > 0) { 1040 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE); 1041 } 1042 return familyToDataBlockEncoding; 1043 } 1044 1045 private void setupMockStartKeys(RegionLocator table) throws IOException { 1046 byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"), 1047 Bytes.toBytes("ggg"), Bytes.toBytes("zzz") }; 1048 Mockito.doReturn(mockKeys).when(table).getStartKeys(); 1049 } 1050 1051 private void setupMockTableName(RegionLocator table) throws IOException { 1052 TableName mockTableName = TableName.valueOf("mock_table"); 1053 Mockito.doReturn(mockTableName).when(table).getName(); 1054 } 1055 1056 /** 1057 * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings 1058 * from the column family descriptor 1059 */ 1060 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1061 @Test 1062 public void testColumnFamilySettings() throws Exception { 1063 Configuration conf = new Configuration(this.util.getConfiguration()); 1064 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1065 TaskAttemptContext context = null; 1066 Path dir = util.getDataTestDir("testColumnFamilySettings"); 1067 1068 // Setup table descriptor 1069 Table table = Mockito.mock(Table.class); 1070 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 1071 HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]); 1072 Mockito.doReturn(htd).when(table).getTableDescriptor(); 1073 for (HColumnDescriptor hcd : HBaseTestingUtility.generateColumnDescriptors()) { 1074 htd.addFamily(hcd); 1075 } 1076 1077 // set up the table to return some mock keys 1078 setupMockStartKeys(regionLocator); 1079 1080 try { 1081 // partial map red setup to get an operational writer for testing 1082 // We turn off the sequence file compression, because DefaultCodec 1083 // pollutes the GZip codec pool with an incompatible compressor. 1084 conf.set("io.seqfile.compression.type", "NONE"); 1085 conf.set("hbase.fs.tmp.dir", dir.toString()); 1086 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 1087 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1088 1089 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 1090 job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); 1091 setupRandomGeneratorMapper(job, false); 1092 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 1093 FileOutputFormat.setOutputPath(job, dir); 1094 context = createTestTaskAttemptContext(job); 1095 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1096 writer = hof.getRecordWriter(context); 1097 1098 // write out random rows 1099 writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT); 1100 writer.close(context); 1101 1102 // Make sure that a directory was created for every CF 1103 FileSystem fs = dir.getFileSystem(conf); 1104 1105 // commit so that the filesystem has one directory per column family 1106 hof.getOutputCommitter(context).commitTask(context); 1107 hof.getOutputCommitter(context).commitJob(context); 1108 FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs)); 1109 assertEquals(htd.getFamilies().size(), families.length); 1110 for (FileStatus f : families) { 1111 String familyStr = f.getPath().getName(); 1112 HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr)); 1113 // verify that the compression on this file matches the configured 1114 // compression 1115 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath(); 1116 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf); 1117 Map<byte[], byte[]> fileInfo = reader.getHFileInfo(); 1118 1119 byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY); 1120 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE"); 1121 assertEquals( 1122 "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")", 1123 hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter))); 1124 assertEquals( 1125 "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")", 1126 hcd.getCompressionType(), reader.getFileContext().getCompression()); 1127 } 1128 } finally { 1129 dir.getFileSystem(conf).delete(dir, true); 1130 } 1131 } 1132 1133 /** 1134 * Write random values to the writer assuming a table created using {@link #FAMILIES} as column 1135 * family descriptors 1136 */ 1137 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer, 1138 TaskAttemptContext context, Set<byte[]> families, int numRows) 1139 throws IOException, InterruptedException { 1140 byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; 1141 int valLength = 10; 1142 byte valBytes[] = new byte[valLength]; 1143 1144 int taskId = context.getTaskAttemptID().getTaskID().getId(); 1145 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 1146 final byte[] qualifier = Bytes.toBytes("data"); 1147 Random random = new Random(); 1148 for (int i = 0; i < numRows; i++) { 1149 1150 Bytes.putInt(keyBytes, 0, i); 1151 random.nextBytes(valBytes); 1152 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); 1153 1154 for (byte[] family : families) { 1155 Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes); 1156 writer.write(key, kv); 1157 } 1158 } 1159 } 1160 1161 /** 1162 * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and 1163 * excluded from minor compaction. Without the fix of HBASE-6901, an 1164 * ArrayIndexOutOfBoundsException will be thrown. 1165 */ 1166 @Ignore("Flakey: See HBASE-9051") 1167 @Test 1168 public void testExcludeAllFromMinorCompaction() throws Exception { 1169 Configuration conf = util.getConfiguration(); 1170 conf.setInt("hbase.hstore.compaction.min", 2); 1171 generateRandomStartKeys(5); 1172 1173 util.startMiniCluster(); 1174 try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin(); 1175 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1176 RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) { 1177 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1178 assertEquals("Should start with empty table", 0, util.countRows(table)); 1179 1180 // deep inspection: get the StoreFile dir 1181 final Path storePath = 1182 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1183 new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1184 Bytes.toString(FAMILIES[0]))); 1185 assertEquals(0, fs.listStatus(storePath).length); 1186 1187 // Generate two bulk load files 1188 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 1189 1190 for (int i = 0; i < 2; i++) { 1191 Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); 1192 runIncrementalPELoad(conf, 1193 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), 1194 conn.getRegionLocator(TABLE_NAMES[0]))), 1195 testDir, false); 1196 // Perform the actual load 1197 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator); 1198 } 1199 1200 // Ensure data shows up 1201 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1202 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 1203 util.countRows(table)); 1204 1205 // should have a second StoreFile now 1206 assertEquals(2, fs.listStatus(storePath).length); 1207 1208 // minor compactions shouldn't get rid of the file 1209 admin.compact(TABLE_NAMES[0]); 1210 try { 1211 quickPoll(new Callable<Boolean>() { 1212 @Override 1213 public Boolean call() throws Exception { 1214 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1215 for (HRegion region : regions) { 1216 for (HStore store : region.getStores()) { 1217 store.closeAndArchiveCompactedFiles(); 1218 } 1219 } 1220 return fs.listStatus(storePath).length == 1; 1221 } 1222 }, 5000); 1223 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1224 } catch (AssertionError ae) { 1225 // this is expected behavior 1226 } 1227 1228 // a major compaction should work though 1229 admin.majorCompact(TABLE_NAMES[0]); 1230 quickPoll(new Callable<Boolean>() { 1231 @Override 1232 public Boolean call() throws Exception { 1233 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1234 for (HRegion region : regions) { 1235 for (HStore store : region.getStores()) { 1236 store.closeAndArchiveCompactedFiles(); 1237 } 1238 } 1239 return fs.listStatus(storePath).length == 1; 1240 } 1241 }, 5000); 1242 1243 } finally { 1244 util.shutdownMiniCluster(); 1245 } 1246 } 1247 1248 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1249 @Test 1250 public void testExcludeMinorCompaction() throws Exception { 1251 Configuration conf = util.getConfiguration(); 1252 conf.setInt("hbase.hstore.compaction.min", 2); 1253 generateRandomStartKeys(5); 1254 1255 util.startMiniCluster(); 1256 try (Connection conn = ConnectionFactory.createConnection(conf); 1257 Admin admin = conn.getAdmin()) { 1258 Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); 1259 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1260 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1261 assertEquals("Should start with empty table", 0, util.countRows(table)); 1262 1263 // deep inspection: get the StoreFile dir 1264 final Path storePath = 1265 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1266 new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1267 Bytes.toString(FAMILIES[0]))); 1268 assertEquals(0, fs.listStatus(storePath).length); 1269 1270 // put some data in it and flush to create a storefile 1271 Put p = new Put(Bytes.toBytes("test")); 1272 p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); 1273 table.put(p); 1274 admin.flush(TABLE_NAMES[0]); 1275 assertEquals(1, util.countRows(table)); 1276 quickPoll(new Callable<Boolean>() { 1277 @Override 1278 public Boolean call() throws Exception { 1279 return fs.listStatus(storePath).length == 1; 1280 } 1281 }, 5000); 1282 1283 // Generate a bulk load file with more rows 1284 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 1285 1286 RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]); 1287 runIncrementalPELoad(conf, 1288 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), regionLocator)), 1289 testDir, false); 1290 1291 // Perform the actual load 1292 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator); 1293 1294 // Ensure data shows up 1295 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1296 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows + 1, 1297 util.countRows(table)); 1298 1299 // should have a second StoreFile now 1300 assertEquals(2, fs.listStatus(storePath).length); 1301 1302 // minor compactions shouldn't get rid of the file 1303 admin.compact(TABLE_NAMES[0]); 1304 try { 1305 quickPoll(new Callable<Boolean>() { 1306 @Override 1307 public Boolean call() throws Exception { 1308 return fs.listStatus(storePath).length == 1; 1309 } 1310 }, 5000); 1311 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1312 } catch (AssertionError ae) { 1313 // this is expected behavior 1314 } 1315 1316 // a major compaction should work though 1317 admin.majorCompact(TABLE_NAMES[0]); 1318 quickPoll(new Callable<Boolean>() { 1319 @Override 1320 public Boolean call() throws Exception { 1321 return fs.listStatus(storePath).length == 1; 1322 } 1323 }, 5000); 1324 1325 } finally { 1326 util.shutdownMiniCluster(); 1327 } 1328 } 1329 1330 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception { 1331 int sleepMs = 10; 1332 int retries = (int) Math.ceil(((double) waitMs) / sleepMs); 1333 while (retries-- > 0) { 1334 if (c.call().booleanValue()) { 1335 return; 1336 } 1337 Thread.sleep(sleepMs); 1338 } 1339 fail(); 1340 } 1341 1342 public static void main(String args[]) throws Exception { 1343 new TestCellBasedHFileOutputFormat2().manualTest(args); 1344 } 1345 1346 public void manualTest(String args[]) throws Exception { 1347 Configuration conf = HBaseConfiguration.create(); 1348 util = new HBaseTestingUtility(conf); 1349 if ("newtable".equals(args[0])) { 1350 TableName tname = TableName.valueOf(args[1]); 1351 byte[][] splitKeys = generateRandomSplitKeys(4); 1352 Table table = util.createTable(tname, FAMILIES, splitKeys); 1353 } else if ("incremental".equals(args[0])) { 1354 TableName tname = TableName.valueOf(args[1]); 1355 try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin(); 1356 RegionLocator regionLocator = c.getRegionLocator(tname)) { 1357 Path outDir = new Path("incremental-out"); 1358 runIncrementalPELoad(conf, 1359 Arrays.asList( 1360 new HFileOutputFormat2.TableInfo(admin.getTableDescriptor(tname), regionLocator)), 1361 outDir, false); 1362 } 1363 } else { 1364 throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental"); 1365 } 1366 } 1367 1368 @Test 1369 public void testBlockStoragePolicy() throws Exception { 1370 util = new HBaseTestingUtility(); 1371 Configuration conf = util.getConfiguration(); 1372 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); 1373 1374 conf.set( 1375 HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes 1376 .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])), 1377 "ONE_SSD"); 1378 Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0])); 1379 Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1])); 1380 util.startMiniDFSCluster(3); 1381 FileSystem fs = util.getDFSCluster().getFileSystem(); 1382 try { 1383 fs.mkdirs(cf1Dir); 1384 fs.mkdirs(cf2Dir); 1385 1386 // the original block storage policy would be HOT 1387 String spA = getStoragePolicyName(fs, cf1Dir); 1388 String spB = getStoragePolicyName(fs, cf2Dir); 1389 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1390 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1391 assertEquals("HOT", spA); 1392 assertEquals("HOT", spB); 1393 1394 // alter table cf schema to change storage policies 1395 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1396 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir); 1397 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1398 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir); 1399 spA = getStoragePolicyName(fs, cf1Dir); 1400 spB = getStoragePolicyName(fs, cf2Dir); 1401 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1402 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1403 assertNotNull(spA); 1404 assertEquals("ONE_SSD", spA); 1405 assertNotNull(spB); 1406 assertEquals("ALL_SSD", spB); 1407 } finally { 1408 fs.delete(cf1Dir, true); 1409 fs.delete(cf2Dir, true); 1410 util.shutdownMiniDFSCluster(); 1411 } 1412 } 1413 1414 private String getStoragePolicyName(FileSystem fs, Path path) { 1415 try { 1416 Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path); 1417 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 1418 } catch (Exception e) { 1419 // Maybe fail because of using old HDFS version, try the old way 1420 if (LOG.isTraceEnabled()) { 1421 LOG.trace("Failed to get policy directly", e); 1422 } 1423 String policy = getStoragePolicyNameForOldHDFSVersion(fs, path); 1424 return policy == null ? "HOT" : policy;// HOT by default 1425 } 1426 } 1427 1428 private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) { 1429 try { 1430 if (fs instanceof DistributedFileSystem) { 1431 DistributedFileSystem dfs = (DistributedFileSystem) fs; 1432 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 1433 if (null != status) { 1434 byte storagePolicyId = status.getStoragePolicy(); 1435 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 1436 if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) { 1437 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 1438 for (BlockStoragePolicy policy : policies) { 1439 if (policy.getId() == storagePolicyId) { 1440 return policy.getName(); 1441 } 1442 } 1443 } 1444 } 1445 } 1446 } catch (Throwable e) { 1447 LOG.warn("failed to get block storage policy of [" + path + "]", e); 1448 } 1449 1450 return null; 1451 } 1452}