001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.Mockito.doAnswer; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.when; 028 029import java.io.ByteArrayOutputStream; 030import java.io.File; 031import java.io.IOException; 032import java.io.PrintStream; 033import java.net.URL; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.List; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellUtil; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseTestingUtility; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.KeepDeletedCells; 046import org.apache.hadoop.hbase.KeyValue; 047import org.apache.hadoop.hbase.PrivateCellUtil; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 050import org.apache.hadoop.hbase.client.Delete; 051import org.apache.hadoop.hbase.client.Durability; 052import org.apache.hadoop.hbase.client.Get; 053import org.apache.hadoop.hbase.client.Put; 054import org.apache.hadoop.hbase.client.RegionInfo; 055import org.apache.hadoop.hbase.client.Result; 056import org.apache.hadoop.hbase.client.ResultScanner; 057import org.apache.hadoop.hbase.client.Scan; 058import org.apache.hadoop.hbase.client.Table; 059import org.apache.hadoop.hbase.client.TableDescriptor; 060import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 061import org.apache.hadoop.hbase.filter.Filter; 062import org.apache.hadoop.hbase.filter.FilterBase; 063import org.apache.hadoop.hbase.filter.PrefixFilter; 064import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 065import org.apache.hadoop.hbase.mapreduce.Import.CellImporter; 066import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 067import org.apache.hadoop.hbase.testclassification.MediumTests; 068import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 071import org.apache.hadoop.hbase.util.LauncherSecurityManager; 072import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 073import org.apache.hadoop.hbase.wal.WAL; 074import org.apache.hadoop.hbase.wal.WALEdit; 075import org.apache.hadoop.hbase.wal.WALKey; 076import org.apache.hadoop.mapreduce.Mapper.Context; 077import org.apache.hadoop.util.GenericOptionsParser; 078import org.apache.hadoop.util.ToolRunner; 079import org.junit.After; 080import org.junit.AfterClass; 081import org.junit.Assert; 082import org.junit.Before; 083import org.junit.BeforeClass; 084import org.junit.ClassRule; 085import org.junit.Rule; 086import org.junit.Test; 087import org.junit.experimental.categories.Category; 088import org.junit.rules.TestName; 089import org.mockito.invocation.InvocationOnMock; 090import org.mockito.stubbing.Answer; 091import org.slf4j.Logger; 092import org.slf4j.LoggerFactory; 093 094/** 095 * Tests the table import and table export MR job functionality 096 */ 097@Category({ VerySlowMapReduceTests.class, MediumTests.class }) 098public class TestCellBasedImportExport2 { 099 100 @ClassRule 101 public static final HBaseClassTestRule CLASS_RULE = 102 HBaseClassTestRule.forClass(TestCellBasedImportExport2.class); 103 104 private static final Logger LOG = LoggerFactory.getLogger(TestCellBasedImportExport2.class); 105 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 106 private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1"); 107 private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2"); 108 private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3"); 109 private static final String FAMILYA_STRING = "a"; 110 private static final String FAMILYB_STRING = "b"; 111 private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING); 112 private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING); 113 private static final byte[] QUAL = Bytes.toBytes("q"); 114 private static final String OUTPUT_DIR = "outputdir"; 115 private static String FQ_OUTPUT_DIR; 116 private static final String EXPORT_BATCH_SIZE = "100"; 117 118 private static final long now = EnvironmentEdgeManager.currentTime(); 119 private final TableName EXPORT_TABLE = TableName.valueOf("export_table"); 120 private final TableName IMPORT_TABLE = TableName.valueOf("import_table"); 121 122 @BeforeClass 123 public static void beforeClass() throws Throwable { 124 // Up the handlers; this test needs more than usual. 125 UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 126 UTIL.startMiniCluster(); 127 FQ_OUTPUT_DIR = 128 new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString(); 129 } 130 131 @AfterClass 132 public static void afterClass() throws Throwable { 133 UTIL.shutdownMiniCluster(); 134 } 135 136 @Rule 137 public final TestName name = new TestName(); 138 139 @Before 140 public void announce() { 141 LOG.info("Running " + name.getMethodName()); 142 } 143 144 @After 145 public void cleanup() throws Throwable { 146 FileSystem fs = FileSystem.get(UTIL.getConfiguration()); 147 fs.delete(new Path(OUTPUT_DIR), true); 148 if (UTIL.getAdmin().tableExists(EXPORT_TABLE)) { 149 UTIL.deleteTable(EXPORT_TABLE); 150 } 151 if (UTIL.getAdmin().tableExists(IMPORT_TABLE)) { 152 UTIL.deleteTable(IMPORT_TABLE); 153 } 154 } 155 156 /** 157 * Runs an export job with the specified command line args 158 * @return true if job completed successfully 159 */ 160 protected boolean runExport(String[] args) throws Throwable { 161 // need to make a copy of the configuration because to make sure different temp dirs are used. 162 int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args); 163 return status == 0; 164 } 165 166 protected void runExportMain(String[] args) throws Throwable { 167 Export.main(args); 168 } 169 170 /** 171 * Runs an import job with the specified command line args 172 * @return true if job completed successfully 173 */ 174 boolean runImport(String[] args) throws Throwable { 175 // need to make a copy of the configuration because to make sure different temp dirs are used. 176 int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args); 177 return status == 0; 178 } 179 180 /** 181 * Test simple replication case with column mapping 182 */ 183 @Test 184 public void testSimpleCase() throws Throwable { 185 try (Table t = UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILYA, 3);) { 186 Put p = new Put(ROW1); 187 p.addColumn(FAMILYA, QUAL, now, QUAL); 188 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 189 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 190 t.put(p); 191 p = new Put(ROW2); 192 p.addColumn(FAMILYA, QUAL, now, QUAL); 193 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 194 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 195 t.put(p); 196 p = new Put(ROW3); 197 p.addColumn(FAMILYA, QUAL, now, QUAL); 198 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 199 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 200 t.put(p); 201 } 202 203 String[] args = new String[] { 204 // Only export row1 & row2. 205 "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1", 206 "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3", name.getMethodName(), FQ_OUTPUT_DIR, 207 "1000", // max number of key versions per key to export 208 }; 209 assertTrue(runExport(args)); 210 211 final String IMPORT_TABLE = name.getMethodName() + "import"; 212 try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3);) { 213 args = 214 new String[] { "-D" + Import.CF_RENAME_PROP + "=" + FAMILYA_STRING + ":" + FAMILYB_STRING, 215 IMPORT_TABLE, FQ_OUTPUT_DIR }; 216 assertTrue(runImport(args)); 217 218 Get g = new Get(ROW1); 219 g.setMaxVersions(); 220 Result r = t.get(g); 221 assertEquals(3, r.size()); 222 g = new Get(ROW2); 223 g.setMaxVersions(); 224 r = t.get(g); 225 assertEquals(3, r.size()); 226 g = new Get(ROW3); 227 r = t.get(g); 228 assertEquals(0, r.size()); 229 } 230 } 231 232 /** 233 * Test export hbase:meta table 234 */ 235 @Test 236 public void testMetaExport() throws Throwable { 237 String[] args = 238 new String[] { TableName.META_TABLE_NAME.getNameAsString(), FQ_OUTPUT_DIR, "1", "0", "0" }; 239 assertTrue(runExport(args)); 240 } 241 242 /** 243 * Test import data from 0.94 exported file 244 */ 245 @Test 246 public void testImport94Table() throws Throwable { 247 final String name = "exportedTableIn94Format"; 248 URL url = TestCellBasedImportExport2.class.getResource(name); 249 File f = new File(url.toURI()); 250 if (!f.exists()) { 251 LOG.warn("FAILED TO FIND " + f + "; skipping out on test"); 252 return; 253 } 254 assertTrue(f.exists()); 255 LOG.info("FILE=" + f); 256 Path importPath = new Path(f.toURI()); 257 FileSystem fs = FileSystem.get(UTIL.getConfiguration()); 258 fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name)); 259 String IMPORT_TABLE = name; 260 try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3);) { 261 String[] args = new String[] { "-Dhbase.import.version=0.94", IMPORT_TABLE, FQ_OUTPUT_DIR }; 262 assertTrue(runImport(args)); 263 /* 264 * exportedTableIn94Format contains 5 rows ROW COLUMN+CELL r1 column=f1:c1, 265 * timestamp=1383766761171, value=val1 r2 column=f1:c1, timestamp=1383766771642, value=val2 r3 266 * column=f1:c1, timestamp=1383766777615, value=val3 r4 column=f1:c1, timestamp=1383766785146, 267 * value=val4 r5 column=f1:c1, timestamp=1383766791506, value=val5 268 */ 269 assertEquals(5, UTIL.countRows(t)); 270 } 271 } 272 273 /** 274 * Test export scanner batching 275 */ 276 @Test 277 public void testExportScannerBatching() throws Throwable { 278 TableDescriptor desc = TableDescriptorBuilder 279 .newBuilder(TableName.valueOf(name.getMethodName())) 280 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(1).build()) 281 .build(); 282 UTIL.getAdmin().createTable(desc); 283 try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { 284 285 Put p = new Put(ROW1); 286 p.addColumn(FAMILYA, QUAL, now, QUAL); 287 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 288 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 289 p.addColumn(FAMILYA, QUAL, now + 3, QUAL); 290 p.addColumn(FAMILYA, QUAL, now + 4, QUAL); 291 t.put(p); 292 293 String[] args = new String[] { "-D" + ExportUtils.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added 294 // scanner 295 // batching 296 // arg. 297 name.getMethodName(), FQ_OUTPUT_DIR }; 298 assertTrue(runExport(args)); 299 300 FileSystem fs = FileSystem.get(UTIL.getConfiguration()); 301 fs.delete(new Path(FQ_OUTPUT_DIR), true); 302 } 303 } 304 305 @Test 306 public void testWithDeletes() throws Throwable { 307 TableDescriptor desc = 308 TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) 309 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5) 310 .setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 311 .build(); 312 UTIL.getAdmin().createTable(desc); 313 try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { 314 315 Put p = new Put(ROW1); 316 p.addColumn(FAMILYA, QUAL, now, QUAL); 317 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 318 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 319 p.addColumn(FAMILYA, QUAL, now + 3, QUAL); 320 p.addColumn(FAMILYA, QUAL, now + 4, QUAL); 321 t.put(p); 322 323 Delete d = new Delete(ROW1, now + 3); 324 t.delete(d); 325 d = new Delete(ROW1); 326 d.addColumns(FAMILYA, QUAL, now + 2); 327 t.delete(d); 328 } 329 330 String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true", name.getMethodName(), 331 FQ_OUTPUT_DIR, "1000", // max number of key versions per key to export 332 }; 333 assertTrue(runExport(args)); 334 335 final String IMPORT_TABLE = name.getMethodName() + "import"; 336 desc = TableDescriptorBuilder 337 .newBuilder(TableName.valueOf(IMPORT_TABLE)).setColumnFamily(ColumnFamilyDescriptorBuilder 338 .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 339 .build(); 340 UTIL.getAdmin().createTable(desc); 341 try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { 342 args = new String[] { IMPORT_TABLE, FQ_OUTPUT_DIR }; 343 assertTrue(runImport(args)); 344 345 Scan s = new Scan(); 346 s.setMaxVersions(); 347 s.setRaw(true); 348 ResultScanner scanner = t.getScanner(s); 349 Result r = scanner.next(); 350 Cell[] res = r.rawCells(); 351 assertTrue(PrivateCellUtil.isDeleteFamily(res[0])); 352 assertEquals(now + 4, res[1].getTimestamp()); 353 assertEquals(now + 3, res[2].getTimestamp()); 354 assertTrue(CellUtil.isDelete(res[3])); 355 assertEquals(now + 2, res[4].getTimestamp()); 356 assertEquals(now + 1, res[5].getTimestamp()); 357 assertEquals(now, res[6].getTimestamp()); 358 } 359 } 360 361 @Test 362 public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Throwable { 363 final TableName exportTable = TableName.valueOf(name.getMethodName()); 364 TableDescriptor desc = 365 TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) 366 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5) 367 .setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 368 .build(); 369 UTIL.getAdmin().createTable(desc); 370 371 Table exportT = UTIL.getConnection().getTable(exportTable); 372 373 // Add first version of QUAL 374 Put p = new Put(ROW1); 375 p.addColumn(FAMILYA, QUAL, now, QUAL); 376 exportT.put(p); 377 378 // Add Delete family marker 379 Delete d = new Delete(ROW1, now + 3); 380 exportT.delete(d); 381 382 // Add second version of QUAL 383 p = new Put(ROW1); 384 p.addColumn(FAMILYA, QUAL, now + 5, "s".getBytes()); 385 exportT.put(p); 386 387 // Add second Delete family marker 388 d = new Delete(ROW1, now + 7); 389 exportT.delete(d); 390 391 String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true", 392 exportTable.getNameAsString(), FQ_OUTPUT_DIR, "1000", // max number of key versions per key to 393 // export 394 }; 395 assertTrue(runExport(args)); 396 397 final String importTable = name.getMethodName() + "import"; 398 desc = TableDescriptorBuilder 399 .newBuilder(TableName.valueOf(importTable)).setColumnFamily(ColumnFamilyDescriptorBuilder 400 .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 401 .build(); 402 UTIL.getAdmin().createTable(desc); 403 404 Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable)); 405 args = new String[] { importTable, FQ_OUTPUT_DIR }; 406 assertTrue(runImport(args)); 407 408 Scan s = new Scan(); 409 s.setMaxVersions(); 410 s.setRaw(true); 411 412 ResultScanner importedTScanner = importT.getScanner(s); 413 Result importedTResult = importedTScanner.next(); 414 415 ResultScanner exportedTScanner = exportT.getScanner(s); 416 Result exportedTResult = exportedTScanner.next(); 417 try { 418 Result.compareResults(exportedTResult, importedTResult); 419 } catch (Throwable e) { 420 fail("Original and imported tables data comparision failed with error:" + e.getMessage()); 421 } finally { 422 exportT.close(); 423 importT.close(); 424 } 425 } 426 427 /** 428 * Create a simple table, run an Export Job on it, Import with filtering on, verify counts, 429 * attempt with invalid values. 430 */ 431 @Test 432 public void testWithFilter() throws Throwable { 433 // Create simple table to export 434 TableDescriptor desc = TableDescriptorBuilder 435 .newBuilder(TableName.valueOf(name.getMethodName())) 436 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build()) 437 .build(); 438 UTIL.getAdmin().createTable(desc); 439 Table exportTable = UTIL.getConnection().getTable(desc.getTableName()); 440 441 Put p1 = new Put(ROW1); 442 p1.addColumn(FAMILYA, QUAL, now, QUAL); 443 p1.addColumn(FAMILYA, QUAL, now + 1, QUAL); 444 p1.addColumn(FAMILYA, QUAL, now + 2, QUAL); 445 p1.addColumn(FAMILYA, QUAL, now + 3, QUAL); 446 p1.addColumn(FAMILYA, QUAL, now + 4, QUAL); 447 448 // Having another row would actually test the filter. 449 Put p2 = new Put(ROW2); 450 p2.addColumn(FAMILYA, QUAL, now, QUAL); 451 452 exportTable.put(Arrays.asList(p1, p2)); 453 454 // Export the simple table 455 String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" }; 456 assertTrue(runExport(args)); 457 458 // Import to a new table 459 final String IMPORT_TABLE = name.getMethodName() + "import"; 460 desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(IMPORT_TABLE)) 461 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build()) 462 .build(); 463 UTIL.getAdmin().createTable(desc); 464 465 Table importTable = UTIL.getConnection().getTable(desc.getTableName()); 466 args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(), 467 "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR, 468 "1000" }; 469 assertTrue(runImport(args)); 470 471 // get the count of the source table for that time range 472 PrefixFilter filter = new PrefixFilter(ROW1); 473 int count = getCount(exportTable, filter); 474 475 Assert.assertEquals("Unexpected row count between export and import tables", count, 476 getCount(importTable, null)); 477 478 // and then test that a broken command doesn't bork everything - easier here because we don't 479 // need to re-run the export job 480 481 args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(), 482 "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", name.getMethodName(), 483 FQ_OUTPUT_DIR, "1000" }; 484 assertFalse(runImport(args)); 485 486 // cleanup 487 exportTable.close(); 488 importTable.close(); 489 } 490 491 /** 492 * Count the number of keyvalues in the specified table for the given timerange 493 */ 494 private int getCount(Table table, Filter filter) throws IOException { 495 Scan scan = new Scan(); 496 scan.setFilter(filter); 497 ResultScanner results = table.getScanner(scan); 498 int count = 0; 499 for (Result res : results) { 500 count += res.size(); 501 } 502 results.close(); 503 return count; 504 } 505 506 /** 507 * test main method. Import should print help and call System.exit 508 */ 509 @Test 510 public void testImportMain() throws Throwable { 511 PrintStream oldPrintStream = System.err; 512 SecurityManager SECURITY_MANAGER = System.getSecurityManager(); 513 LauncherSecurityManager newSecurityManager = new LauncherSecurityManager(); 514 System.setSecurityManager(newSecurityManager); 515 ByteArrayOutputStream data = new ByteArrayOutputStream(); 516 String[] args = {}; 517 System.setErr(new PrintStream(data)); 518 try { 519 System.setErr(new PrintStream(data)); 520 Import.main(args); 521 fail("should be SecurityException"); 522 } catch (SecurityException e) { 523 assertEquals(-1, newSecurityManager.getExitCode()); 524 assertTrue(data.toString().contains("Wrong number of arguments:")); 525 assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output")); 526 assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>")); 527 assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output")); 528 assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false")); 529 } finally { 530 System.setErr(oldPrintStream); 531 System.setSecurityManager(SECURITY_MANAGER); 532 } 533 } 534 535 @Test 536 public void testExportScan() throws Exception { 537 int version = 100; 538 long startTime = EnvironmentEdgeManager.currentTime(); 539 long endTime = startTime + 1; 540 String prefix = "row"; 541 String label_0 = "label_0"; 542 String label_1 = "label_1"; 543 String[] args = { "table", "outputDir", String.valueOf(version), String.valueOf(startTime), 544 String.valueOf(endTime), prefix }; 545 Scan scan = ExportUtils.getScanFromCommandLine(UTIL.getConfiguration(), args); 546 assertEquals(version, scan.getMaxVersions()); 547 assertEquals(startTime, scan.getTimeRange().getMin()); 548 assertEquals(endTime, scan.getTimeRange().getMax()); 549 assertEquals(true, (scan.getFilter() instanceof PrefixFilter)); 550 assertEquals(0, 551 Bytes.compareTo(((PrefixFilter) scan.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix))); 552 String[] argsWithLabels = 553 { "-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + label_0 + "," + label_1, "table", 554 "outputDir", String.valueOf(version), String.valueOf(startTime), String.valueOf(endTime), 555 prefix }; 556 Configuration conf = new Configuration(UTIL.getConfiguration()); 557 // parse the "-D" options 558 String[] otherArgs = new GenericOptionsParser(conf, argsWithLabels).getRemainingArgs(); 559 Scan scanWithLabels = ExportUtils.getScanFromCommandLine(conf, otherArgs); 560 assertEquals(version, scanWithLabels.getMaxVersions()); 561 assertEquals(startTime, scanWithLabels.getTimeRange().getMin()); 562 assertEquals(endTime, scanWithLabels.getTimeRange().getMax()); 563 assertEquals(true, (scanWithLabels.getFilter() instanceof PrefixFilter)); 564 assertEquals(0, Bytes.compareTo(((PrefixFilter) scanWithLabels.getFilter()).getPrefix(), 565 Bytes.toBytesBinary(prefix))); 566 assertEquals(2, scanWithLabels.getAuthorizations().getLabels().size()); 567 assertEquals(label_0, scanWithLabels.getAuthorizations().getLabels().get(0)); 568 assertEquals(label_1, scanWithLabels.getAuthorizations().getLabels().get(1)); 569 } 570 571 /** 572 * test main method. Export should print help and call System.exit 573 */ 574 @Test 575 public void testExportMain() throws Throwable { 576 PrintStream oldPrintStream = System.err; 577 SecurityManager SECURITY_MANAGER = System.getSecurityManager(); 578 LauncherSecurityManager newSecurityManager = new LauncherSecurityManager(); 579 System.setSecurityManager(newSecurityManager); 580 ByteArrayOutputStream data = new ByteArrayOutputStream(); 581 String[] args = {}; 582 System.setErr(new PrintStream(data)); 583 try { 584 System.setErr(new PrintStream(data)); 585 runExportMain(args); 586 fail("should be SecurityException"); 587 } catch (SecurityException e) { 588 assertEquals(-1, newSecurityManager.getExitCode()); 589 String errMsg = data.toString(); 590 assertTrue(errMsg.contains("Wrong number of arguments:")); 591 assertTrue( 592 errMsg.contains("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " 593 + "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]")); 594 assertTrue(errMsg.contains("-D hbase.mapreduce.scan.column.family=<family1>,<family2>, ...")); 595 assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true")); 596 assertTrue(errMsg.contains("-D hbase.client.scanner.caching=100")); 597 assertTrue(errMsg.contains("-D hbase.export.scanner.batch=10")); 598 assertTrue(errMsg.contains("-D hbase.export.scanner.caching=100")); 599 } finally { 600 System.setErr(oldPrintStream); 601 System.setSecurityManager(SECURITY_MANAGER); 602 } 603 } 604 605 /** 606 * Test map method of Importer 607 */ 608 @SuppressWarnings({ "unchecked", "rawtypes" }) 609 @Test 610 public void testKeyValueImporter() throws Throwable { 611 CellImporter importer = new CellImporter(); 612 Configuration configuration = new Configuration(); 613 Context ctx = mock(Context.class); 614 when(ctx.getConfiguration()).thenReturn(configuration); 615 616 doAnswer(new Answer<Void>() { 617 618 @Override 619 public Void answer(InvocationOnMock invocation) throws Throwable { 620 ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0]; 621 MapReduceExtendedCell key = (MapReduceExtendedCell) invocation.getArguments()[1]; 622 assertEquals("Key", Bytes.toString(writer.get())); 623 assertEquals("row", Bytes.toString(CellUtil.cloneRow(key))); 624 return null; 625 } 626 }).when(ctx).write(any(ImmutableBytesWritable.class), any(MapReduceExtendedCell.class)); 627 628 importer.setup(ctx); 629 Result value = mock(Result.class); 630 KeyValue[] keys = { 631 new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"), 632 Bytes.toBytes("value")), 633 new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"), 634 Bytes.toBytes("value1")) }; 635 when(value.rawCells()).thenReturn(keys); 636 importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx); 637 638 } 639 640 /** 641 * Test addFilterAndArguments method of Import This method set couple parameters into 642 * Configuration 643 */ 644 @Test 645 public void testAddFilterAndArguments() throws IOException { 646 Configuration configuration = new Configuration(); 647 648 List<String> args = new ArrayList<>(); 649 args.add("param1"); 650 args.add("param2"); 651 652 Import.addFilterAndArguments(configuration, FilterBase.class, args); 653 assertEquals("org.apache.hadoop.hbase.filter.FilterBase", 654 configuration.get(Import.FILTER_CLASS_CONF_KEY)); 655 assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY)); 656 } 657 658 @Test 659 public void testDurability() throws Throwable { 660 // Create an export table. 661 String exportTableName = name.getMethodName() + "export"; 662 try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) { 663 664 // Insert some data 665 Put put = new Put(ROW1); 666 put.addColumn(FAMILYA, QUAL, now, QUAL); 667 put.addColumn(FAMILYA, QUAL, now + 1, QUAL); 668 put.addColumn(FAMILYA, QUAL, now + 2, QUAL); 669 exportTable.put(put); 670 671 put = new Put(ROW2); 672 put.addColumn(FAMILYA, QUAL, now, QUAL); 673 put.addColumn(FAMILYA, QUAL, now + 1, QUAL); 674 put.addColumn(FAMILYA, QUAL, now + 2, QUAL); 675 exportTable.put(put); 676 677 // Run the export 678 String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000" }; 679 assertTrue(runExport(args)); 680 681 // Create the table for import 682 String importTableName = name.getMethodName() + "import1"; 683 Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); 684 685 // Register the wal listener for the import table 686 RegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() 687 .getRegions(importTable.getName()).get(0).getRegionInfo(); 688 TableWALActionListener walListener = new TableWALActionListener(region); 689 WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); 690 wal.registerWALActionsListener(walListener); 691 692 // Run the import with SKIP_WAL 693 args = new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(), 694 importTableName, FQ_OUTPUT_DIR }; 695 assertTrue(runImport(args)); 696 // Assert that the wal is not visisted 697 assertTrue(!walListener.isWALVisited()); 698 // Ensure that the count is 2 (only one version of key value is obtained) 699 assertTrue(getCount(importTable, null) == 2); 700 701 // Run the import with the default durability option 702 importTableName = name.getMethodName() + "import2"; 703 importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); 704 region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() 705 .getRegions(importTable.getName()).get(0).getRegionInfo(); 706 wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); 707 walListener = new TableWALActionListener(region); 708 wal.registerWALActionsListener(walListener); 709 args = new String[] { importTableName, FQ_OUTPUT_DIR }; 710 assertTrue(runImport(args)); 711 // Assert that the wal is visisted 712 assertTrue(walListener.isWALVisited()); 713 // Ensure that the count is 2 (only one version of key value is obtained) 714 assertTrue(getCount(importTable, null) == 2); 715 } 716 } 717 718 /** 719 * This listens to the {@link #visitLogEntryBeforeWrite(RegionInfo, WALKey, WALEdit)} to identify 720 * that an entry is written to the Write Ahead Log for the given table. 721 */ 722 private static class TableWALActionListener implements WALActionsListener { 723 724 private RegionInfo regionInfo; 725 private boolean isVisited = false; 726 727 public TableWALActionListener(RegionInfo region) { 728 this.regionInfo = region; 729 } 730 731 @Override 732 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 733 if ( 734 logKey.getTableName().getNameAsString() 735 .equalsIgnoreCase(this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit()) 736 ) { 737 isVisited = true; 738 } 739 } 740 741 public boolean isWALVisited() { 742 return isVisited; 743 } 744 } 745}