001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY; 021import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026import static org.mockito.ArgumentMatchers.any; 027import static org.mockito.Mockito.doAnswer; 028import static org.mockito.Mockito.mock; 029import static org.mockito.Mockito.when; 030 031import java.io.ByteArrayOutputStream; 032import java.io.File; 033import java.io.IOException; 034import java.io.PrintStream; 035import java.net.URL; 036import java.util.ArrayList; 037import java.util.Arrays; 038import java.util.List; 039import java.util.Optional; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.ArrayBackedTag; 044import org.apache.hadoop.hbase.Cell; 045import org.apache.hadoop.hbase.CellUtil; 046import org.apache.hadoop.hbase.ExtendedCell; 047import org.apache.hadoop.hbase.ExtendedCellScanner; 048import org.apache.hadoop.hbase.HBaseClassTestRule; 049import org.apache.hadoop.hbase.HBaseTestingUtil; 050import org.apache.hadoop.hbase.HConstants; 051import org.apache.hadoop.hbase.KeepDeletedCells; 052import org.apache.hadoop.hbase.KeyValue; 053import org.apache.hadoop.hbase.PrivateCellUtil; 054import org.apache.hadoop.hbase.TableName; 055import org.apache.hadoop.hbase.Tag; 056import org.apache.hadoop.hbase.client.ClientInternalHelper; 057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 058import org.apache.hadoop.hbase.client.Connection; 059import org.apache.hadoop.hbase.client.ConnectionFactory; 060import org.apache.hadoop.hbase.client.Delete; 061import org.apache.hadoop.hbase.client.Durability; 062import org.apache.hadoop.hbase.client.Get; 063import org.apache.hadoop.hbase.client.Mutation; 064import org.apache.hadoop.hbase.client.Put; 065import org.apache.hadoop.hbase.client.RegionInfo; 066import org.apache.hadoop.hbase.client.Result; 067import org.apache.hadoop.hbase.client.ResultScanner; 068import org.apache.hadoop.hbase.client.Scan; 069import org.apache.hadoop.hbase.client.Table; 070import org.apache.hadoop.hbase.client.TableDescriptor; 071import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 072import org.apache.hadoop.hbase.coprocessor.ObserverContext; 073import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 074import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 075import org.apache.hadoop.hbase.coprocessor.RegionObserver; 076import org.apache.hadoop.hbase.filter.Filter; 077import org.apache.hadoop.hbase.filter.FilterBase; 078import org.apache.hadoop.hbase.filter.PrefixFilter; 079import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 080import org.apache.hadoop.hbase.mapreduce.Import.CellImporter; 081import org.apache.hadoop.hbase.regionserver.HRegion; 082import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 083import org.apache.hadoop.hbase.regionserver.RegionScanner; 084import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 085import org.apache.hadoop.hbase.testclassification.MediumTests; 086import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 087import org.apache.hadoop.hbase.util.Bytes; 088import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 089import org.apache.hadoop.hbase.util.LauncherSecurityManager; 090import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 091import org.apache.hadoop.hbase.wal.WAL; 092import org.apache.hadoop.hbase.wal.WALEdit; 093import org.apache.hadoop.hbase.wal.WALKey; 094import org.apache.hadoop.mapreduce.Mapper.Context; 095import org.apache.hadoop.util.GenericOptionsParser; 096import org.apache.hadoop.util.ToolRunner; 097import org.junit.After; 098import org.junit.AfterClass; 099import org.junit.Assert; 100import org.junit.Before; 101import org.junit.BeforeClass; 102import org.junit.ClassRule; 103import org.junit.Rule; 104import org.junit.Test; 105import org.junit.experimental.categories.Category; 106import org.junit.rules.TestName; 107import org.mockito.invocation.InvocationOnMock; 108import org.mockito.stubbing.Answer; 109import org.slf4j.Logger; 110import org.slf4j.LoggerFactory; 111 112/** 113 * Tests the table import and table export MR job functionality 114 */ 115@Category({ VerySlowMapReduceTests.class, MediumTests.class }) 116public class TestImportExport { 117 118 @ClassRule 119 public static final HBaseClassTestRule CLASS_RULE = 120 HBaseClassTestRule.forClass(TestImportExport.class); 121 122 private static final Logger LOG = LoggerFactory.getLogger(TestImportExport.class); 123 protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 124 private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1"); 125 private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2"); 126 private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3"); 127 private static final String FAMILYA_STRING = "a"; 128 private static final String FAMILYB_STRING = "b"; 129 private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING); 130 private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING); 131 private static final byte[] QUAL = Bytes.toBytes("q"); 132 private static final String OUTPUT_DIR = "outputdir"; 133 private static String FQ_OUTPUT_DIR; 134 private static final String EXPORT_BATCH_SIZE = "100"; 135 136 private static final long now = EnvironmentEdgeManager.currentTime(); 137 private final TableName EXPORT_TABLE = TableName.valueOf("export_table"); 138 private final TableName IMPORT_TABLE = TableName.valueOf("import_table"); 139 public static final byte TEST_TAG_TYPE = (byte) (Tag.CUSTOM_TAG_TYPE_RANGE + 1); 140 public static final String TEST_ATTR = "source_op"; 141 public static final String TEST_TAG = "test_tag"; 142 143 @BeforeClass 144 public static void beforeClass() throws Throwable { 145 // Up the handlers; this test needs more than usual. 146 UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 147 UTIL.startMiniCluster(); 148 FQ_OUTPUT_DIR = 149 new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString(); 150 } 151 152 @AfterClass 153 public static void afterClass() throws Throwable { 154 UTIL.shutdownMiniCluster(); 155 } 156 157 @Rule 158 public final TestName name = new TestName(); 159 160 @Before 161 public void announce() { 162 LOG.info("Running " + name.getMethodName()); 163 } 164 165 @After 166 public void cleanup() throws Throwable { 167 FileSystem fs = FileSystem.get(UTIL.getConfiguration()); 168 fs.delete(new Path(OUTPUT_DIR), true); 169 if (UTIL.getAdmin().tableExists(EXPORT_TABLE)) { 170 UTIL.deleteTable(EXPORT_TABLE); 171 } 172 if (UTIL.getAdmin().tableExists(IMPORT_TABLE)) { 173 UTIL.deleteTable(IMPORT_TABLE); 174 } 175 } 176 177 /** 178 * Runs an export job with the specified command line args 179 * @return true if job completed successfully 180 */ 181 protected boolean runExport(String[] args) throws Throwable { 182 // need to make a copy of the configuration because to make sure different temp dirs are used. 183 int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args); 184 return status == 0; 185 } 186 187 protected void runExportMain(String[] args) throws Throwable { 188 Export.main(args); 189 } 190 191 /** 192 * Runs an import job with the specified command line args 193 * @return true if job completed successfully 194 */ 195 boolean runImport(String[] args) throws Throwable { 196 // need to make a copy of the configuration because to make sure different temp dirs are used. 197 int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args); 198 return status == 0; 199 } 200 201 /** 202 * Test simple replication case with column mapping 203 */ 204 @Test 205 public void testSimpleCase() throws Throwable { 206 try (Table t = UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILYA, 3)) { 207 Put p = new Put(ROW1); 208 p.addColumn(FAMILYA, QUAL, now, QUAL); 209 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 210 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 211 t.put(p); 212 p = new Put(ROW2); 213 p.addColumn(FAMILYA, QUAL, now, QUAL); 214 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 215 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 216 t.put(p); 217 p = new Put(ROW3); 218 p.addColumn(FAMILYA, QUAL, now, QUAL); 219 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 220 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 221 t.put(p); 222 } 223 224 String[] args = new String[] { 225 // Only export row1 & row2. 226 "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1", 227 "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3", name.getMethodName(), FQ_OUTPUT_DIR, 228 "1000", // max number of key versions per key to export 229 }; 230 assertTrue(runExport(args)); 231 232 final String IMPORT_TABLE = name.getMethodName() + "import"; 233 try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3)) { 234 args = 235 new String[] { "-D" + Import.CF_RENAME_PROP + "=" + FAMILYA_STRING + ":" + FAMILYB_STRING, 236 IMPORT_TABLE, FQ_OUTPUT_DIR }; 237 assertTrue(runImport(args)); 238 239 Get g = new Get(ROW1); 240 g.readAllVersions(); 241 Result r = t.get(g); 242 assertEquals(3, r.size()); 243 g = new Get(ROW2); 244 g.readAllVersions(); 245 r = t.get(g); 246 assertEquals(3, r.size()); 247 g = new Get(ROW3); 248 r = t.get(g); 249 assertEquals(0, r.size()); 250 } 251 } 252 253 /** 254 * Test export hbase:meta table 255 */ 256 @Test 257 public void testMetaExport() throws Throwable { 258 String[] args = 259 new String[] { TableName.META_TABLE_NAME.getNameAsString(), FQ_OUTPUT_DIR, "1", "0", "0" }; 260 assertTrue(runExport(args)); 261 } 262 263 /** 264 * Test import data from 0.94 exported file 265 */ 266 @Test 267 public void testImport94Table() throws Throwable { 268 final String name = "exportedTableIn94Format"; 269 URL url = TestImportExport.class.getResource(name); 270 File f = new File(url.toURI()); 271 if (!f.exists()) { 272 LOG.warn("FAILED TO FIND " + f + "; skipping out on test"); 273 return; 274 } 275 assertTrue(f.exists()); 276 LOG.info("FILE=" + f); 277 Path importPath = new Path(f.toURI()); 278 FileSystem fs = FileSystem.get(UTIL.getConfiguration()); 279 fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name)); 280 String IMPORT_TABLE = name; 281 try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3)) { 282 String[] args = new String[] { "-Dhbase.import.version=0.94", IMPORT_TABLE, FQ_OUTPUT_DIR }; 283 assertTrue(runImport(args)); 284 // @formatter:off 285 // exportedTableIn94Format contains 5 rows 286 // ROW COLUMN+CELL 287 // r1 column=f1:c1, timestamp=1383766761171, value=val1 288 // r2 column=f1:c1, timestamp=1383766771642, value=val2 289 // r3 column=f1:c1, timestamp=1383766777615, value=val3 290 // r4 column=f1:c1, timestamp=1383766785146, value=val4 291 // r5 column=f1:c1, timestamp=1383766791506, value=val5 292 // @formatter:on 293 assertEquals(5, UTIL.countRows(t)); 294 } 295 } 296 297 /** 298 * Test export scanner batching 299 */ 300 @Test 301 public void testExportScannerBatching() throws Throwable { 302 TableDescriptor desc = TableDescriptorBuilder 303 .newBuilder(TableName.valueOf(name.getMethodName())) 304 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(1).build()) 305 .build(); 306 UTIL.getAdmin().createTable(desc); 307 try (Table t = UTIL.getConnection().getTable(desc.getTableName())) { 308 Put p = new Put(ROW1); 309 p.addColumn(FAMILYA, QUAL, now, QUAL); 310 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 311 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 312 p.addColumn(FAMILYA, QUAL, now + 3, QUAL); 313 p.addColumn(FAMILYA, QUAL, now + 4, QUAL); 314 t.put(p); 315 // added scanner batching arg. 316 String[] args = new String[] { "-D" + ExportUtils.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, 317 name.getMethodName(), FQ_OUTPUT_DIR }; 318 assertTrue(runExport(args)); 319 320 FileSystem fs = FileSystem.get(UTIL.getConfiguration()); 321 fs.delete(new Path(FQ_OUTPUT_DIR), true); 322 } 323 } 324 325 @Test 326 public void testWithDeletes() throws Throwable { 327 TableDescriptor desc = 328 TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) 329 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5) 330 .setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 331 .build(); 332 UTIL.getAdmin().createTable(desc); 333 try (Table t = UTIL.getConnection().getTable(desc.getTableName())) { 334 Put p = new Put(ROW1); 335 p.addColumn(FAMILYA, QUAL, now, QUAL); 336 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 337 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 338 p.addColumn(FAMILYA, QUAL, now + 3, QUAL); 339 p.addColumn(FAMILYA, QUAL, now + 4, QUAL); 340 t.put(p); 341 342 Delete d = new Delete(ROW1, now + 3); 343 t.delete(d); 344 d = new Delete(ROW1); 345 d.addColumns(FAMILYA, QUAL, now + 2); 346 t.delete(d); 347 } 348 349 String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true", name.getMethodName(), 350 FQ_OUTPUT_DIR, "1000", // max number of key versions per key to export 351 }; 352 assertTrue(runExport(args)); 353 354 final String IMPORT_TABLE = name.getMethodName() + "import"; 355 desc = TableDescriptorBuilder 356 .newBuilder(TableName.valueOf(IMPORT_TABLE)).setColumnFamily(ColumnFamilyDescriptorBuilder 357 .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 358 .build(); 359 UTIL.getAdmin().createTable(desc); 360 try (Table t = UTIL.getConnection().getTable(desc.getTableName())) { 361 args = new String[] { IMPORT_TABLE, FQ_OUTPUT_DIR }; 362 assertTrue(runImport(args)); 363 364 Scan s = new Scan(); 365 s.readAllVersions(); 366 s.setRaw(true); 367 ResultScanner scanner = t.getScanner(s); 368 Result r = scanner.next(); 369 ExtendedCell[] res = ClientInternalHelper.getExtendedRawCells(r); 370 assertTrue(PrivateCellUtil.isDeleteFamily(res[0])); 371 assertEquals(now + 4, res[1].getTimestamp()); 372 assertEquals(now + 3, res[2].getTimestamp()); 373 assertTrue(CellUtil.isDelete(res[3])); 374 assertEquals(now + 2, res[4].getTimestamp()); 375 assertEquals(now + 1, res[5].getTimestamp()); 376 assertEquals(now, res[6].getTimestamp()); 377 } 378 } 379 380 @Test 381 public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Throwable { 382 final TableName exportTable = TableName.valueOf(name.getMethodName()); 383 TableDescriptor desc = 384 TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) 385 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5) 386 .setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 387 .build(); 388 UTIL.getAdmin().createTable(desc); 389 390 Table exportT = UTIL.getConnection().getTable(exportTable); 391 392 // Add first version of QUAL 393 Put p = new Put(ROW1); 394 p.addColumn(FAMILYA, QUAL, now, QUAL); 395 exportT.put(p); 396 397 // Add Delete family marker 398 Delete d = new Delete(ROW1, now + 3); 399 exportT.delete(d); 400 401 // Add second version of QUAL 402 p = new Put(ROW1); 403 p.addColumn(FAMILYA, QUAL, now + 5, Bytes.toBytes("s")); 404 exportT.put(p); 405 406 // Add second Delete family marker 407 d = new Delete(ROW1, now + 7); 408 exportT.delete(d); 409 410 String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true", 411 exportTable.getNameAsString(), FQ_OUTPUT_DIR, "1000", // max number of key versions per key to 412 // export 413 }; 414 assertTrue(runExport(args)); 415 416 final String importTable = name.getMethodName() + "import"; 417 desc = TableDescriptorBuilder 418 .newBuilder(TableName.valueOf(importTable)).setColumnFamily(ColumnFamilyDescriptorBuilder 419 .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 420 .build(); 421 UTIL.getAdmin().createTable(desc); 422 423 Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable)); 424 args = new String[] { importTable, FQ_OUTPUT_DIR }; 425 assertTrue(runImport(args)); 426 427 Scan s = new Scan(); 428 s.readAllVersions(); 429 s.setRaw(true); 430 431 ResultScanner importedTScanner = importT.getScanner(s); 432 Result importedTResult = importedTScanner.next(); 433 434 ResultScanner exportedTScanner = exportT.getScanner(s); 435 Result exportedTResult = exportedTScanner.next(); 436 try { 437 Result.compareResults(exportedTResult, importedTResult); 438 } catch (Throwable e) { 439 fail("Original and imported tables data comparision failed with error:" + e.getMessage()); 440 } finally { 441 exportT.close(); 442 importT.close(); 443 } 444 } 445 446 /** 447 * Create a simple table, run an Export Job on it, Import with filtering on, verify counts, 448 * attempt with invalid values. 449 */ 450 @Test 451 public void testWithFilter() throws Throwable { 452 // Create simple table to export 453 TableDescriptor desc = TableDescriptorBuilder 454 .newBuilder(TableName.valueOf(name.getMethodName())) 455 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build()) 456 .build(); 457 UTIL.getAdmin().createTable(desc); 458 Table exportTable = UTIL.getConnection().getTable(desc.getTableName()); 459 460 Put p1 = new Put(ROW1); 461 p1.addColumn(FAMILYA, QUAL, now, QUAL); 462 p1.addColumn(FAMILYA, QUAL, now + 1, QUAL); 463 p1.addColumn(FAMILYA, QUAL, now + 2, QUAL); 464 p1.addColumn(FAMILYA, QUAL, now + 3, QUAL); 465 p1.addColumn(FAMILYA, QUAL, now + 4, QUAL); 466 467 // Having another row would actually test the filter. 468 Put p2 = new Put(ROW2); 469 p2.addColumn(FAMILYA, QUAL, now, QUAL); 470 471 exportTable.put(Arrays.asList(p1, p2)); 472 473 // Export the simple table 474 String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" }; 475 assertTrue(runExport(args)); 476 477 // Import to a new table 478 final String IMPORT_TABLE = name.getMethodName() + "import"; 479 desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(IMPORT_TABLE)) 480 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build()) 481 .build(); 482 UTIL.getAdmin().createTable(desc); 483 484 Table importTable = UTIL.getConnection().getTable(desc.getTableName()); 485 args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(), 486 "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR, 487 "1000" }; 488 assertTrue(runImport(args)); 489 490 // get the count of the source table for that time range 491 PrefixFilter filter = new PrefixFilter(ROW1); 492 int count = getCount(exportTable, filter); 493 494 Assert.assertEquals("Unexpected row count between export and import tables", count, 495 getCount(importTable, null)); 496 497 // and then test that a broken command doesn't bork everything - easier here because we don't 498 // need to re-run the export job 499 500 args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(), 501 "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", name.getMethodName(), 502 FQ_OUTPUT_DIR, "1000" }; 503 assertFalse(runImport(args)); 504 505 // cleanup 506 exportTable.close(); 507 importTable.close(); 508 } 509 510 /** 511 * Create a simple table, run an Export Job on it, Import with bulk output and enable largeResult 512 */ 513 @Test 514 public void testBulkImportAndLargeResult() throws Throwable { 515 // Create simple table to export 516 TableDescriptor desc = TableDescriptorBuilder 517 .newBuilder(TableName.valueOf(name.getMethodName())) 518 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build()) 519 .build(); 520 UTIL.getAdmin().createTable(desc); 521 Table exportTable = UTIL.getConnection().getTable(desc.getTableName()); 522 523 Put p1 = new Put(ROW1); 524 p1.addColumn(FAMILYA, QUAL, now, QUAL); 525 526 // Having another row would actually test the filter. 527 Put p2 = new Put(ROW2); 528 p2.addColumn(FAMILYA, QUAL, now, QUAL); 529 530 exportTable.put(Arrays.asList(p1, p2)); 531 532 // Export the simple table 533 String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" }; 534 assertTrue(runExport(args)); 535 536 // Import to a new table 537 final String IMPORT_TABLE = name.getMethodName() + "import"; 538 desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(IMPORT_TABLE)) 539 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build()) 540 .build(); 541 UTIL.getAdmin().createTable(desc); 542 543 String O_OUTPUT_DIR = 544 new Path(OUTPUT_DIR + 1).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString(); 545 546 args = new String[] { "-D" + Import.BULK_OUTPUT_CONF_KEY + "=" + O_OUTPUT_DIR, 547 "-D" + Import.HAS_LARGE_RESULT + "=" + true, IMPORT_TABLE, FQ_OUTPUT_DIR, "1000" }; 548 assertTrue(runImport(args)); 549 } 550 551 /** 552 * Count the number of keyvalues in the specified table with the given filter 553 * @param table the table to scan 554 * @return the number of keyvalues found 555 */ 556 private int getCount(Table table, Filter filter) throws IOException { 557 Scan scan = new Scan(); 558 scan.setFilter(filter); 559 ResultScanner results = table.getScanner(scan); 560 int count = 0; 561 for (Result res : results) { 562 count += res.size(); 563 } 564 results.close(); 565 return count; 566 } 567 568 /** 569 * test main method. Import should print help and call System.exit 570 */ 571 @Test 572 public void testImportMain() throws Throwable { 573 PrintStream oldPrintStream = System.err; 574 SecurityManager SECURITY_MANAGER = System.getSecurityManager(); 575 LauncherSecurityManager newSecurityManager = new LauncherSecurityManager(); 576 System.setSecurityManager(newSecurityManager); 577 ByteArrayOutputStream data = new ByteArrayOutputStream(); 578 String[] args = {}; 579 System.setErr(new PrintStream(data)); 580 try { 581 System.setErr(new PrintStream(data)); 582 Import.main(args); 583 fail("should be SecurityException"); 584 } catch (SecurityException e) { 585 assertEquals(-1, newSecurityManager.getExitCode()); 586 assertTrue(data.toString().contains("Wrong number of arguments:")); 587 assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output")); 588 assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>")); 589 assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output")); 590 assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false")); 591 } finally { 592 System.setErr(oldPrintStream); 593 System.setSecurityManager(SECURITY_MANAGER); 594 } 595 } 596 597 @Test 598 public void testExportScan() throws Exception { 599 int version = 100; 600 long startTime = EnvironmentEdgeManager.currentTime(); 601 long endTime = startTime + 1; 602 String prefix = "row"; 603 String label_0 = "label_0"; 604 String label_1 = "label_1"; 605 String[] args = { "table", "outputDir", String.valueOf(version), String.valueOf(startTime), 606 String.valueOf(endTime), prefix }; 607 Scan scan = ExportUtils.getScanFromCommandLine(UTIL.getConfiguration(), args); 608 assertEquals(version, scan.getMaxVersions()); 609 assertEquals(startTime, scan.getTimeRange().getMin()); 610 assertEquals(endTime, scan.getTimeRange().getMax()); 611 assertEquals(true, (scan.getFilter() instanceof PrefixFilter)); 612 assertEquals(0, 613 Bytes.compareTo(((PrefixFilter) scan.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix))); 614 String[] argsWithLabels = 615 { "-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + label_0 + "," + label_1, "table", 616 "outputDir", String.valueOf(version), String.valueOf(startTime), String.valueOf(endTime), 617 prefix }; 618 Configuration conf = new Configuration(UTIL.getConfiguration()); 619 // parse the "-D" options 620 String[] otherArgs = new GenericOptionsParser(conf, argsWithLabels).getRemainingArgs(); 621 Scan scanWithLabels = ExportUtils.getScanFromCommandLine(conf, otherArgs); 622 assertEquals(version, scanWithLabels.getMaxVersions()); 623 assertEquals(startTime, scanWithLabels.getTimeRange().getMin()); 624 assertEquals(endTime, scanWithLabels.getTimeRange().getMax()); 625 assertEquals(true, (scanWithLabels.getFilter() instanceof PrefixFilter)); 626 assertEquals(0, Bytes.compareTo(((PrefixFilter) scanWithLabels.getFilter()).getPrefix(), 627 Bytes.toBytesBinary(prefix))); 628 assertEquals(2, scanWithLabels.getAuthorizations().getLabels().size()); 629 assertEquals(label_0, scanWithLabels.getAuthorizations().getLabels().get(0)); 630 assertEquals(label_1, scanWithLabels.getAuthorizations().getLabels().get(1)); 631 } 632 633 /** 634 * test main method. Export should print help and call System.exit 635 */ 636 @Test 637 public void testExportMain() throws Throwable { 638 PrintStream oldPrintStream = System.err; 639 SecurityManager SECURITY_MANAGER = System.getSecurityManager(); 640 LauncherSecurityManager newSecurityManager = new LauncherSecurityManager(); 641 System.setSecurityManager(newSecurityManager); 642 ByteArrayOutputStream data = new ByteArrayOutputStream(); 643 String[] args = {}; 644 System.setErr(new PrintStream(data)); 645 try { 646 System.setErr(new PrintStream(data)); 647 runExportMain(args); 648 fail("should be SecurityException"); 649 } catch (SecurityException e) { 650 assertEquals(-1, newSecurityManager.getExitCode()); 651 String errMsg = data.toString(); 652 assertTrue(errMsg.contains("Wrong number of arguments:")); 653 assertTrue( 654 errMsg.contains("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " 655 + "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]")); 656 assertTrue(errMsg.contains("-D hbase.mapreduce.scan.column.family=<family1>,<family2>, ...")); 657 assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true")); 658 assertTrue(errMsg.contains("-D hbase.client.scanner.caching=100")); 659 assertTrue(errMsg.contains("-D hbase.export.scanner.batch=10")); 660 assertTrue(errMsg.contains("-D hbase.export.scanner.caching=100")); 661 } finally { 662 System.setErr(oldPrintStream); 663 System.setSecurityManager(SECURITY_MANAGER); 664 } 665 } 666 667 /** 668 * Test map method of Importer 669 */ 670 @SuppressWarnings({ "unchecked", "rawtypes" }) 671 @Test 672 public void testKeyValueImporter() throws Throwable { 673 CellImporter importer = new CellImporter(); 674 Configuration configuration = new Configuration(); 675 Context ctx = mock(Context.class); 676 when(ctx.getConfiguration()).thenReturn(configuration); 677 678 doAnswer(new Answer<Void>() { 679 680 @Override 681 public Void answer(InvocationOnMock invocation) throws Throwable { 682 ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArgument(0); 683 MapReduceExtendedCell key = (MapReduceExtendedCell) invocation.getArgument(1); 684 assertEquals("Key", Bytes.toString(writer.get())); 685 assertEquals("row", Bytes.toString(CellUtil.cloneRow(key))); 686 return null; 687 } 688 }).when(ctx).write(any(), any()); 689 690 importer.setup(ctx); 691 KeyValue[] keys = { 692 new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"), 693 Bytes.toBytes("value")), 694 new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"), 695 Bytes.toBytes("value1")) }; 696 Result value = Result.create(keys); 697 importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx); 698 699 } 700 701 /** 702 * Test addFilterAndArguments method of Import This method set couple parameters into 703 * Configuration 704 */ 705 @Test 706 public void testAddFilterAndArguments() throws IOException { 707 Configuration configuration = new Configuration(); 708 709 List<String> args = new ArrayList<>(); 710 args.add("param1"); 711 args.add("param2"); 712 713 Import.addFilterAndArguments(configuration, FilterBase.class, args); 714 assertEquals("org.apache.hadoop.hbase.filter.FilterBase", 715 configuration.get(Import.FILTER_CLASS_CONF_KEY)); 716 assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY)); 717 } 718 719 @Test 720 public void testDurability() throws Throwable { 721 // Create an export table. 722 String exportTableName = name.getMethodName() + "export"; 723 try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3)) { 724 // Insert some data 725 Put put = new Put(ROW1); 726 put.addColumn(FAMILYA, QUAL, now, QUAL); 727 put.addColumn(FAMILYA, QUAL, now + 1, QUAL); 728 put.addColumn(FAMILYA, QUAL, now + 2, QUAL); 729 exportTable.put(put); 730 731 put = new Put(ROW2); 732 put.addColumn(FAMILYA, QUAL, now, QUAL); 733 put.addColumn(FAMILYA, QUAL, now + 1, QUAL); 734 put.addColumn(FAMILYA, QUAL, now + 2, QUAL); 735 exportTable.put(put); 736 737 // Run the export 738 String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000" }; 739 assertTrue(runExport(args)); 740 741 // Create the table for import 742 String importTableName = name.getMethodName() + "import1"; 743 Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); 744 745 // Register the wal listener for the import table 746 RegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() 747 .getRegions(importTable.getName()).get(0).getRegionInfo(); 748 TableWALActionListener walListener = new TableWALActionListener(region); 749 WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); 750 wal.registerWALActionsListener(walListener); 751 752 // Run the import with SKIP_WAL 753 args = new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(), 754 importTableName, FQ_OUTPUT_DIR }; 755 assertTrue(runImport(args)); 756 // Assert that the wal is not visisted 757 assertTrue(!walListener.isWALVisited()); 758 // Ensure that the count is 2 (only one version of key value is obtained) 759 assertTrue(getCount(importTable, null) == 2); 760 761 // Run the import with the default durability option 762 importTableName = name.getMethodName() + "import2"; 763 importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); 764 region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() 765 .getRegions(importTable.getName()).get(0).getRegionInfo(); 766 wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); 767 walListener = new TableWALActionListener(region); 768 wal.registerWALActionsListener(walListener); 769 args = new String[] { importTableName, FQ_OUTPUT_DIR }; 770 assertTrue(runImport(args)); 771 // Assert that the wal is visisted 772 assertTrue(walListener.isWALVisited()); 773 // Ensure that the count is 2 (only one version of key value is obtained) 774 assertTrue(getCount(importTable, null) == 2); 775 } 776 } 777 778 /** 779 * This listens to the {@link #visitLogEntryBeforeWrite(RegionInfo, WALKey, WALEdit)} to identify 780 * that an entry is written to the Write Ahead Log for the given table. 781 */ 782 private static class TableWALActionListener implements WALActionsListener { 783 784 private RegionInfo regionInfo; 785 private boolean isVisited = false; 786 787 public TableWALActionListener(RegionInfo region) { 788 this.regionInfo = region; 789 } 790 791 @Override 792 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 793 if ( 794 logKey.getTableName().getNameAsString() 795 .equalsIgnoreCase(this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit()) 796 ) { 797 isVisited = true; 798 } 799 } 800 801 public boolean isWALVisited() { 802 return isVisited; 803 } 804 } 805 806 /** 807 * Add cell tags to delete mutations, run export and import tool and verify that tags are present 808 * in import table also. 809 * @throws Throwable throws Throwable. 810 */ 811 @Test 812 public void testTagsAddition() throws Throwable { 813 final TableName exportTable = TableName.valueOf(name.getMethodName()); 814 TableDescriptor desc = TableDescriptorBuilder.newBuilder(exportTable) 815 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5) 816 .setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 817 .setCoprocessor(MetadataController.class.getName()).build(); 818 UTIL.getAdmin().createTable(desc); 819 820 Table exportT = UTIL.getConnection().getTable(exportTable); 821 822 // Add first version of QUAL 823 Put p = new Put(ROW1); 824 p.addColumn(FAMILYA, QUAL, now, QUAL); 825 exportT.put(p); 826 827 // Add Delete family marker 828 Delete d = new Delete(ROW1, now + 3); 829 // Add test attribute to delete mutation. 830 d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG)); 831 exportT.delete(d); 832 833 // Run export tool with KeyValueCodecWithTags as Codec. This will ensure that export tool 834 // will use KeyValueCodecWithTags. 835 String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true", 836 // This will make sure that codec will encode and decode tags in rpc call. 837 "-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags", 838 exportTable.getNameAsString(), FQ_OUTPUT_DIR, "1000", // max number of key versions per key to 839 // export 840 }; 841 assertTrue(runExport(args)); 842 // Assert tag exists in exportTable 843 checkWhetherTagExists(exportTable, true); 844 845 // Create an import table with MetadataController. 846 final TableName importTable = TableName.valueOf("importWithTestTagsAddition"); 847 TableDescriptor importTableDesc = TableDescriptorBuilder.newBuilder(importTable) 848 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5) 849 .setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 850 .setCoprocessor(MetadataController.class.getName()).build(); 851 UTIL.getAdmin().createTable(importTableDesc); 852 853 // Run import tool. 854 args = new String[] { 855 // This will make sure that codec will encode and decode tags in rpc call. 856 "-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags", 857 importTable.getNameAsString(), FQ_OUTPUT_DIR }; 858 assertTrue(runImport(args)); 859 // Make sure that tags exists in imported table. 860 checkWhetherTagExists(importTable, true); 861 } 862 863 private void checkWhetherTagExists(TableName table, boolean tagExists) throws IOException { 864 List<ExtendedCell> values = new ArrayList<>(); 865 for (HRegion region : UTIL.getHBaseCluster().getRegions(table)) { 866 Scan scan = new Scan(); 867 // Make sure to set rawScan to true so that we will get Delete Markers. 868 scan.setRaw(true); 869 scan.readAllVersions(); 870 scan.withStartRow(ROW1); 871 // Need to use RegionScanner instead of table#getScanner since the latter will 872 // not return tags since it will go through rpc layer and remove tags intentionally. 873 RegionScanner scanner = region.getScanner(scan); 874 scanner.next(values); 875 if (!values.isEmpty()) { 876 break; 877 } 878 } 879 boolean deleteFound = false; 880 for (ExtendedCell cell : values) { 881 if (PrivateCellUtil.isDelete(cell.getType().getCode())) { 882 deleteFound = true; 883 List<Tag> tags = PrivateCellUtil.getTags(cell); 884 // If tagExists flag is true then validate whether tag contents are as expected. 885 if (tagExists) { 886 Assert.assertEquals(1, tags.size()); 887 for (Tag tag : tags) { 888 Assert.assertEquals(TEST_TAG, Tag.getValueAsString(tag)); 889 } 890 } else { 891 // If tagExists flag is disabled then check for 0 size tags. 892 assertEquals(0, tags.size()); 893 } 894 } 895 } 896 Assert.assertTrue(deleteFound); 897 } 898 899 /* 900 * This co-proc will add a cell tag to delete mutation. 901 */ 902 public static class MetadataController implements RegionCoprocessor, RegionObserver { 903 @Override 904 public Optional<RegionObserver> getRegionObserver() { 905 return Optional.of(this); 906 } 907 908 @Override 909 public void preBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c, 910 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 911 if (c.getEnvironment().getRegion().getRegionInfo().getTable().isSystemTable()) { 912 return; 913 } 914 for (int i = 0; i < miniBatchOp.size(); i++) { 915 Mutation m = miniBatchOp.getOperation(i); 916 if (!(m instanceof Delete)) { 917 continue; 918 } 919 byte[] sourceOpAttr = m.getAttribute(TEST_ATTR); 920 if (sourceOpAttr == null) { 921 continue; 922 } 923 Tag sourceOpTag = new ArrayBackedTag(TEST_TAG_TYPE, sourceOpAttr); 924 List<Cell> updatedCells = new ArrayList<>(); 925 for (ExtendedCellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { 926 ExtendedCell cell = cellScanner.current(); 927 List<Tag> tags = PrivateCellUtil.getTags(cell); 928 tags.add(sourceOpTag); 929 Cell updatedCell = PrivateCellUtil.createCell(cell, tags); 930 updatedCells.add(updatedCell); 931 } 932 m.getFamilyCellMap().clear(); 933 // Clear and add new Cells to the Mutation. 934 for (Cell cell : updatedCells) { 935 Delete d = (Delete) m; 936 d.add(cell); 937 } 938 } 939 } 940 } 941 942 /** 943 * Set hbase.client.rpc.codec and hbase.client.default.rpc.codec both to empty string This means 944 * it will use no Codec. Make sure that we don't return Tags in response. 945 * @throws Exception Exception 946 */ 947 @Test 948 public void testTagsWithEmptyCodec() throws Exception { 949 TableName tableName = TableName.valueOf(name.getMethodName()); 950 TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName) 951 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5) 952 .setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 953 .setCoprocessor(MetadataController.class.getName()).build(); 954 UTIL.getAdmin().createTable(tableDesc); 955 Configuration conf = new Configuration(UTIL.getConfiguration()); 956 conf.set(RPC_CODEC_CONF_KEY, ""); 957 conf.set(DEFAULT_CODEC_CLASS, ""); 958 try (Connection connection = ConnectionFactory.createConnection(conf); 959 Table table = connection.getTable(tableName)) { 960 // Add first version of QUAL 961 Put p = new Put(ROW1); 962 p.addColumn(FAMILYA, QUAL, now, QUAL); 963 table.put(p); 964 965 // Add Delete family marker 966 Delete d = new Delete(ROW1, now + 3); 967 // Add test attribute to delete mutation. 968 d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG)); 969 table.delete(d); 970 971 // Since RPC_CODEC_CONF_KEY and DEFAULT_CODEC_CLASS is set to empty, it will use 972 // empty Codec and it shouldn't encode/decode tags. 973 Scan scan = new Scan().withStartRow(ROW1).setRaw(true); 974 ResultScanner scanner = table.getScanner(scan); 975 int count = 0; 976 Result result; 977 while ((result = scanner.next()) != null) { 978 List<ExtendedCell> cells = Arrays.asList(ClientInternalHelper.getExtendedRawCells(result)); 979 assertEquals(2, cells.size()); 980 ExtendedCell cell = cells.get(0); 981 assertTrue(CellUtil.isDelete(cell)); 982 List<Tag> tags = PrivateCellUtil.getTags(cell); 983 assertEquals(0, tags.size()); 984 count++; 985 } 986 assertEquals(1, count); 987 } finally { 988 UTIL.deleteTable(tableName); 989 } 990 } 991}