001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY;
021import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026import static org.mockito.ArgumentMatchers.any;
027import static org.mockito.Mockito.doAnswer;
028import static org.mockito.Mockito.mock;
029import static org.mockito.Mockito.when;
030
031import java.io.ByteArrayOutputStream;
032import java.io.File;
033import java.io.IOException;
034import java.io.PrintStream;
035import java.net.URL;
036import java.util.ArrayList;
037import java.util.Arrays;
038import java.util.List;
039import java.util.Optional;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.ArrayBackedTag;
044import org.apache.hadoop.hbase.Cell;
045import org.apache.hadoop.hbase.CellUtil;
046import org.apache.hadoop.hbase.ExtendedCell;
047import org.apache.hadoop.hbase.ExtendedCellScanner;
048import org.apache.hadoop.hbase.HBaseClassTestRule;
049import org.apache.hadoop.hbase.HBaseTestingUtil;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.KeepDeletedCells;
052import org.apache.hadoop.hbase.KeyValue;
053import org.apache.hadoop.hbase.PrivateCellUtil;
054import org.apache.hadoop.hbase.TableName;
055import org.apache.hadoop.hbase.Tag;
056import org.apache.hadoop.hbase.client.ClientInternalHelper;
057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
058import org.apache.hadoop.hbase.client.Connection;
059import org.apache.hadoop.hbase.client.ConnectionFactory;
060import org.apache.hadoop.hbase.client.Delete;
061import org.apache.hadoop.hbase.client.Durability;
062import org.apache.hadoop.hbase.client.Get;
063import org.apache.hadoop.hbase.client.Mutation;
064import org.apache.hadoop.hbase.client.Put;
065import org.apache.hadoop.hbase.client.RegionInfo;
066import org.apache.hadoop.hbase.client.Result;
067import org.apache.hadoop.hbase.client.ResultScanner;
068import org.apache.hadoop.hbase.client.Scan;
069import org.apache.hadoop.hbase.client.Table;
070import org.apache.hadoop.hbase.client.TableDescriptor;
071import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
072import org.apache.hadoop.hbase.coprocessor.ObserverContext;
073import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
074import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
075import org.apache.hadoop.hbase.coprocessor.RegionObserver;
076import org.apache.hadoop.hbase.filter.Filter;
077import org.apache.hadoop.hbase.filter.FilterBase;
078import org.apache.hadoop.hbase.filter.PrefixFilter;
079import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
080import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
081import org.apache.hadoop.hbase.regionserver.HRegion;
082import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
083import org.apache.hadoop.hbase.regionserver.RegionScanner;
084import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
085import org.apache.hadoop.hbase.testclassification.MediumTests;
086import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
087import org.apache.hadoop.hbase.util.Bytes;
088import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
089import org.apache.hadoop.hbase.util.LauncherSecurityManager;
090import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
091import org.apache.hadoop.hbase.wal.WAL;
092import org.apache.hadoop.hbase.wal.WALEdit;
093import org.apache.hadoop.hbase.wal.WALKey;
094import org.apache.hadoop.mapreduce.Mapper.Context;
095import org.apache.hadoop.util.GenericOptionsParser;
096import org.apache.hadoop.util.ToolRunner;
097import org.junit.After;
098import org.junit.AfterClass;
099import org.junit.Assert;
100import org.junit.Before;
101import org.junit.BeforeClass;
102import org.junit.ClassRule;
103import org.junit.Rule;
104import org.junit.Test;
105import org.junit.experimental.categories.Category;
106import org.junit.rules.TestName;
107import org.mockito.invocation.InvocationOnMock;
108import org.mockito.stubbing.Answer;
109import org.slf4j.Logger;
110import org.slf4j.LoggerFactory;
111
112/**
113 * Tests the table import and table export MR job functionality
114 */
115@Category({ VerySlowMapReduceTests.class, MediumTests.class })
116public class TestImportExport {
117
118  @ClassRule
119  public static final HBaseClassTestRule CLASS_RULE =
120    HBaseClassTestRule.forClass(TestImportExport.class);
121
122  private static final Logger LOG = LoggerFactory.getLogger(TestImportExport.class);
123  protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
124  private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1");
125  private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2");
126  private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3");
127  private static final String FAMILYA_STRING = "a";
128  private static final String FAMILYB_STRING = "b";
129  private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
130  private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
131  private static final byte[] QUAL = Bytes.toBytes("q");
132  private static final String OUTPUT_DIR = "outputdir";
133  private static String FQ_OUTPUT_DIR;
134  private static final String EXPORT_BATCH_SIZE = "100";
135
136  private static final long now = EnvironmentEdgeManager.currentTime();
137  private final TableName EXPORT_TABLE = TableName.valueOf("export_table");
138  private final TableName IMPORT_TABLE = TableName.valueOf("import_table");
139  public static final byte TEST_TAG_TYPE = (byte) (Tag.CUSTOM_TAG_TYPE_RANGE + 1);
140  public static final String TEST_ATTR = "source_op";
141  public static final String TEST_TAG = "test_tag";
142
143  @BeforeClass
144  public static void beforeClass() throws Throwable {
145    // Up the handlers; this test needs more than usual.
146    UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
147    UTIL.startMiniCluster();
148    FQ_OUTPUT_DIR =
149      new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
150  }
151
152  @AfterClass
153  public static void afterClass() throws Throwable {
154    UTIL.shutdownMiniCluster();
155  }
156
157  @Rule
158  public final TestName name = new TestName();
159
160  @Before
161  public void announce() {
162    LOG.info("Running " + name.getMethodName());
163  }
164
165  @After
166  public void cleanup() throws Throwable {
167    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
168    fs.delete(new Path(OUTPUT_DIR), true);
169    if (UTIL.getAdmin().tableExists(EXPORT_TABLE)) {
170      UTIL.deleteTable(EXPORT_TABLE);
171    }
172    if (UTIL.getAdmin().tableExists(IMPORT_TABLE)) {
173      UTIL.deleteTable(IMPORT_TABLE);
174    }
175  }
176
177  /**
178   * Runs an export job with the specified command line args
179   * @return true if job completed successfully
180   */
181  protected boolean runExport(String[] args) throws Throwable {
182    // need to make a copy of the configuration because to make sure different temp dirs are used.
183    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args);
184    return status == 0;
185  }
186
187  protected void runExportMain(String[] args) throws Throwable {
188    Export.main(args);
189  }
190
191  /**
192   * Runs an import job with the specified command line args
193   * @return true if job completed successfully
194   */
195  boolean runImport(String[] args) throws Throwable {
196    // need to make a copy of the configuration because to make sure different temp dirs are used.
197    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args);
198    return status == 0;
199  }
200
201  /**
202   * Test simple replication case with column mapping
203   */
204  @Test
205  public void testSimpleCase() throws Throwable {
206    try (Table t = UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILYA, 3)) {
207      Put p = new Put(ROW1);
208      p.addColumn(FAMILYA, QUAL, now, QUAL);
209      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
210      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
211      t.put(p);
212      p = new Put(ROW2);
213      p.addColumn(FAMILYA, QUAL, now, QUAL);
214      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
215      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
216      t.put(p);
217      p = new Put(ROW3);
218      p.addColumn(FAMILYA, QUAL, now, QUAL);
219      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
220      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
221      t.put(p);
222    }
223
224    String[] args = new String[] {
225      // Only export row1 & row2.
226      "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1",
227      "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3", name.getMethodName(), FQ_OUTPUT_DIR,
228      "1000", // max number of key versions per key to export
229    };
230    assertTrue(runExport(args));
231
232    final String IMPORT_TABLE = name.getMethodName() + "import";
233    try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3)) {
234      args =
235        new String[] { "-D" + Import.CF_RENAME_PROP + "=" + FAMILYA_STRING + ":" + FAMILYB_STRING,
236          IMPORT_TABLE, FQ_OUTPUT_DIR };
237      assertTrue(runImport(args));
238
239      Get g = new Get(ROW1);
240      g.readAllVersions();
241      Result r = t.get(g);
242      assertEquals(3, r.size());
243      g = new Get(ROW2);
244      g.readAllVersions();
245      r = t.get(g);
246      assertEquals(3, r.size());
247      g = new Get(ROW3);
248      r = t.get(g);
249      assertEquals(0, r.size());
250    }
251  }
252
253  /**
254   * Test export hbase:meta table
255   */
256  @Test
257  public void testMetaExport() throws Throwable {
258    String[] args =
259      new String[] { TableName.META_TABLE_NAME.getNameAsString(), FQ_OUTPUT_DIR, "1", "0", "0" };
260    assertTrue(runExport(args));
261  }
262
263  /**
264   * Test import data from 0.94 exported file
265   */
266  @Test
267  public void testImport94Table() throws Throwable {
268    final String name = "exportedTableIn94Format";
269    URL url = TestImportExport.class.getResource(name);
270    File f = new File(url.toURI());
271    if (!f.exists()) {
272      LOG.warn("FAILED TO FIND " + f + "; skipping out on test");
273      return;
274    }
275    assertTrue(f.exists());
276    LOG.info("FILE=" + f);
277    Path importPath = new Path(f.toURI());
278    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
279    fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name));
280    String IMPORT_TABLE = name;
281    try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3)) {
282      String[] args = new String[] { "-Dhbase.import.version=0.94", IMPORT_TABLE, FQ_OUTPUT_DIR };
283      assertTrue(runImport(args));
284      // @formatter:off
285      // exportedTableIn94Format contains 5 rows
286      // ROW         COLUMN+CELL
287      // r1          column=f1:c1, timestamp=1383766761171, value=val1
288      // r2          column=f1:c1, timestamp=1383766771642, value=val2
289      // r3          column=f1:c1, timestamp=1383766777615, value=val3
290      // r4          column=f1:c1, timestamp=1383766785146, value=val4
291      // r5          column=f1:c1, timestamp=1383766791506, value=val5
292      // @formatter:on
293      assertEquals(5, UTIL.countRows(t));
294    }
295  }
296
297  /**
298   * Test export scanner batching
299   */
300  @Test
301  public void testExportScannerBatching() throws Throwable {
302    TableDescriptor desc = TableDescriptorBuilder
303      .newBuilder(TableName.valueOf(name.getMethodName()))
304      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(1).build())
305      .build();
306    UTIL.getAdmin().createTable(desc);
307    try (Table t = UTIL.getConnection().getTable(desc.getTableName())) {
308      Put p = new Put(ROW1);
309      p.addColumn(FAMILYA, QUAL, now, QUAL);
310      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
311      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
312      p.addColumn(FAMILYA, QUAL, now + 3, QUAL);
313      p.addColumn(FAMILYA, QUAL, now + 4, QUAL);
314      t.put(p);
315      // added scanner batching arg.
316      String[] args = new String[] { "-D" + ExportUtils.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE,
317        name.getMethodName(), FQ_OUTPUT_DIR };
318      assertTrue(runExport(args));
319
320      FileSystem fs = FileSystem.get(UTIL.getConfiguration());
321      fs.delete(new Path(FQ_OUTPUT_DIR), true);
322    }
323  }
324
325  @Test
326  public void testWithDeletes() throws Throwable {
327    TableDescriptor desc =
328      TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
329        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
330          .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
331        .build();
332    UTIL.getAdmin().createTable(desc);
333    try (Table t = UTIL.getConnection().getTable(desc.getTableName())) {
334      Put p = new Put(ROW1);
335      p.addColumn(FAMILYA, QUAL, now, QUAL);
336      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
337      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
338      p.addColumn(FAMILYA, QUAL, now + 3, QUAL);
339      p.addColumn(FAMILYA, QUAL, now + 4, QUAL);
340      t.put(p);
341
342      Delete d = new Delete(ROW1, now + 3);
343      t.delete(d);
344      d = new Delete(ROW1);
345      d.addColumns(FAMILYA, QUAL, now + 2);
346      t.delete(d);
347    }
348
349    String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true", name.getMethodName(),
350      FQ_OUTPUT_DIR, "1000", // max number of key versions per key to export
351    };
352    assertTrue(runExport(args));
353
354    final String IMPORT_TABLE = name.getMethodName() + "import";
355    desc = TableDescriptorBuilder
356      .newBuilder(TableName.valueOf(IMPORT_TABLE)).setColumnFamily(ColumnFamilyDescriptorBuilder
357        .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build())
358      .build();
359    UTIL.getAdmin().createTable(desc);
360    try (Table t = UTIL.getConnection().getTable(desc.getTableName())) {
361      args = new String[] { IMPORT_TABLE, FQ_OUTPUT_DIR };
362      assertTrue(runImport(args));
363
364      Scan s = new Scan();
365      s.readAllVersions();
366      s.setRaw(true);
367      ResultScanner scanner = t.getScanner(s);
368      Result r = scanner.next();
369      ExtendedCell[] res = ClientInternalHelper.getExtendedRawCells(r);
370      assertTrue(PrivateCellUtil.isDeleteFamily(res[0]));
371      assertEquals(now + 4, res[1].getTimestamp());
372      assertEquals(now + 3, res[2].getTimestamp());
373      assertTrue(CellUtil.isDelete(res[3]));
374      assertEquals(now + 2, res[4].getTimestamp());
375      assertEquals(now + 1, res[5].getTimestamp());
376      assertEquals(now, res[6].getTimestamp());
377    }
378  }
379
380  @Test
381  public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Throwable {
382    final TableName exportTable = TableName.valueOf(name.getMethodName());
383    TableDescriptor desc =
384      TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
385        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
386          .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
387        .build();
388    UTIL.getAdmin().createTable(desc);
389
390    Table exportT = UTIL.getConnection().getTable(exportTable);
391
392    // Add first version of QUAL
393    Put p = new Put(ROW1);
394    p.addColumn(FAMILYA, QUAL, now, QUAL);
395    exportT.put(p);
396
397    // Add Delete family marker
398    Delete d = new Delete(ROW1, now + 3);
399    exportT.delete(d);
400
401    // Add second version of QUAL
402    p = new Put(ROW1);
403    p.addColumn(FAMILYA, QUAL, now + 5, Bytes.toBytes("s"));
404    exportT.put(p);
405
406    // Add second Delete family marker
407    d = new Delete(ROW1, now + 7);
408    exportT.delete(d);
409
410    String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true",
411      exportTable.getNameAsString(), FQ_OUTPUT_DIR, "1000", // max number of key versions per key to
412                                                            // export
413    };
414    assertTrue(runExport(args));
415
416    final String importTable = name.getMethodName() + "import";
417    desc = TableDescriptorBuilder
418      .newBuilder(TableName.valueOf(importTable)).setColumnFamily(ColumnFamilyDescriptorBuilder
419        .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build())
420      .build();
421    UTIL.getAdmin().createTable(desc);
422
423    Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable));
424    args = new String[] { importTable, FQ_OUTPUT_DIR };
425    assertTrue(runImport(args));
426
427    Scan s = new Scan();
428    s.readAllVersions();
429    s.setRaw(true);
430
431    ResultScanner importedTScanner = importT.getScanner(s);
432    Result importedTResult = importedTScanner.next();
433
434    ResultScanner exportedTScanner = exportT.getScanner(s);
435    Result exportedTResult = exportedTScanner.next();
436    try {
437      Result.compareResults(exportedTResult, importedTResult);
438    } catch (Throwable e) {
439      fail("Original and imported tables data comparision failed with error:" + e.getMessage());
440    } finally {
441      exportT.close();
442      importT.close();
443    }
444  }
445
446  /**
447   * Create a simple table, run an Export Job on it, Import with filtering on, verify counts,
448   * attempt with invalid values.
449   */
450  @Test
451  public void testWithFilter() throws Throwable {
452    // Create simple table to export
453    TableDescriptor desc = TableDescriptorBuilder
454      .newBuilder(TableName.valueOf(name.getMethodName()))
455      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
456      .build();
457    UTIL.getAdmin().createTable(desc);
458    Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
459
460    Put p1 = new Put(ROW1);
461    p1.addColumn(FAMILYA, QUAL, now, QUAL);
462    p1.addColumn(FAMILYA, QUAL, now + 1, QUAL);
463    p1.addColumn(FAMILYA, QUAL, now + 2, QUAL);
464    p1.addColumn(FAMILYA, QUAL, now + 3, QUAL);
465    p1.addColumn(FAMILYA, QUAL, now + 4, QUAL);
466
467    // Having another row would actually test the filter.
468    Put p2 = new Put(ROW2);
469    p2.addColumn(FAMILYA, QUAL, now, QUAL);
470
471    exportTable.put(Arrays.asList(p1, p2));
472
473    // Export the simple table
474    String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" };
475    assertTrue(runExport(args));
476
477    // Import to a new table
478    final String IMPORT_TABLE = name.getMethodName() + "import";
479    desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(IMPORT_TABLE))
480      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
481      .build();
482    UTIL.getAdmin().createTable(desc);
483
484    Table importTable = UTIL.getConnection().getTable(desc.getTableName());
485    args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
486      "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR,
487      "1000" };
488    assertTrue(runImport(args));
489
490    // get the count of the source table for that time range
491    PrefixFilter filter = new PrefixFilter(ROW1);
492    int count = getCount(exportTable, filter);
493
494    Assert.assertEquals("Unexpected row count between export and import tables", count,
495      getCount(importTable, null));
496
497    // and then test that a broken command doesn't bork everything - easier here because we don't
498    // need to re-run the export job
499
500    args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
501      "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", name.getMethodName(),
502      FQ_OUTPUT_DIR, "1000" };
503    assertFalse(runImport(args));
504
505    // cleanup
506    exportTable.close();
507    importTable.close();
508  }
509
510  /**
511   * Count the number of keyvalues in the specified table with the given filter
512   * @param table the table to scan
513   * @return the number of keyvalues found
514   */
515  private int getCount(Table table, Filter filter) throws IOException {
516    Scan scan = new Scan();
517    scan.setFilter(filter);
518    ResultScanner results = table.getScanner(scan);
519    int count = 0;
520    for (Result res : results) {
521      count += res.size();
522    }
523    results.close();
524    return count;
525  }
526
527  /**
528   * test main method. Import should print help and call System.exit
529   */
530  @Test
531  public void testImportMain() throws Throwable {
532    PrintStream oldPrintStream = System.err;
533    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
534    LauncherSecurityManager newSecurityManager = new LauncherSecurityManager();
535    System.setSecurityManager(newSecurityManager);
536    ByteArrayOutputStream data = new ByteArrayOutputStream();
537    String[] args = {};
538    System.setErr(new PrintStream(data));
539    try {
540      System.setErr(new PrintStream(data));
541      Import.main(args);
542      fail("should be SecurityException");
543    } catch (SecurityException e) {
544      assertEquals(-1, newSecurityManager.getExitCode());
545      assertTrue(data.toString().contains("Wrong number of arguments:"));
546      assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
547      assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>"));
548      assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
549      assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false"));
550    } finally {
551      System.setErr(oldPrintStream);
552      System.setSecurityManager(SECURITY_MANAGER);
553    }
554  }
555
556  @Test
557  public void testExportScan() throws Exception {
558    int version = 100;
559    long startTime = EnvironmentEdgeManager.currentTime();
560    long endTime = startTime + 1;
561    String prefix = "row";
562    String label_0 = "label_0";
563    String label_1 = "label_1";
564    String[] args = { "table", "outputDir", String.valueOf(version), String.valueOf(startTime),
565      String.valueOf(endTime), prefix };
566    Scan scan = ExportUtils.getScanFromCommandLine(UTIL.getConfiguration(), args);
567    assertEquals(version, scan.getMaxVersions());
568    assertEquals(startTime, scan.getTimeRange().getMin());
569    assertEquals(endTime, scan.getTimeRange().getMax());
570    assertEquals(true, (scan.getFilter() instanceof PrefixFilter));
571    assertEquals(0,
572      Bytes.compareTo(((PrefixFilter) scan.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix)));
573    String[] argsWithLabels =
574      { "-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + label_0 + "," + label_1, "table",
575        "outputDir", String.valueOf(version), String.valueOf(startTime), String.valueOf(endTime),
576        prefix };
577    Configuration conf = new Configuration(UTIL.getConfiguration());
578    // parse the "-D" options
579    String[] otherArgs = new GenericOptionsParser(conf, argsWithLabels).getRemainingArgs();
580    Scan scanWithLabels = ExportUtils.getScanFromCommandLine(conf, otherArgs);
581    assertEquals(version, scanWithLabels.getMaxVersions());
582    assertEquals(startTime, scanWithLabels.getTimeRange().getMin());
583    assertEquals(endTime, scanWithLabels.getTimeRange().getMax());
584    assertEquals(true, (scanWithLabels.getFilter() instanceof PrefixFilter));
585    assertEquals(0, Bytes.compareTo(((PrefixFilter) scanWithLabels.getFilter()).getPrefix(),
586      Bytes.toBytesBinary(prefix)));
587    assertEquals(2, scanWithLabels.getAuthorizations().getLabels().size());
588    assertEquals(label_0, scanWithLabels.getAuthorizations().getLabels().get(0));
589    assertEquals(label_1, scanWithLabels.getAuthorizations().getLabels().get(1));
590  }
591
592  /**
593   * test main method. Export should print help and call System.exit
594   */
595  @Test
596  public void testExportMain() throws Throwable {
597    PrintStream oldPrintStream = System.err;
598    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
599    LauncherSecurityManager newSecurityManager = new LauncherSecurityManager();
600    System.setSecurityManager(newSecurityManager);
601    ByteArrayOutputStream data = new ByteArrayOutputStream();
602    String[] args = {};
603    System.setErr(new PrintStream(data));
604    try {
605      System.setErr(new PrintStream(data));
606      runExportMain(args);
607      fail("should be SecurityException");
608    } catch (SecurityException e) {
609      assertEquals(-1, newSecurityManager.getExitCode());
610      String errMsg = data.toString();
611      assertTrue(errMsg.contains("Wrong number of arguments:"));
612      assertTrue(
613        errMsg.contains("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> "
614          + "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]"));
615      assertTrue(errMsg.contains("-D hbase.mapreduce.scan.column.family=<family1>,<family2>, ..."));
616      assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true"));
617      assertTrue(errMsg.contains("-D hbase.client.scanner.caching=100"));
618      assertTrue(errMsg.contains("-D hbase.export.scanner.batch=10"));
619      assertTrue(errMsg.contains("-D hbase.export.scanner.caching=100"));
620    } finally {
621      System.setErr(oldPrintStream);
622      System.setSecurityManager(SECURITY_MANAGER);
623    }
624  }
625
626  /**
627   * Test map method of Importer
628   */
629  @SuppressWarnings({ "unchecked", "rawtypes" })
630  @Test
631  public void testKeyValueImporter() throws Throwable {
632    CellImporter importer = new CellImporter();
633    Configuration configuration = new Configuration();
634    Context ctx = mock(Context.class);
635    when(ctx.getConfiguration()).thenReturn(configuration);
636
637    doAnswer(new Answer<Void>() {
638
639      @Override
640      public Void answer(InvocationOnMock invocation) throws Throwable {
641        ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArgument(0);
642        MapReduceExtendedCell key = (MapReduceExtendedCell) invocation.getArgument(1);
643        assertEquals("Key", Bytes.toString(writer.get()));
644        assertEquals("row", Bytes.toString(CellUtil.cloneRow(key)));
645        return null;
646      }
647    }).when(ctx).write(any(), any());
648
649    importer.setup(ctx);
650    KeyValue[] keys = {
651      new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
652        Bytes.toBytes("value")),
653      new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
654        Bytes.toBytes("value1")) };
655    Result value = Result.create(keys);
656    importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx);
657
658  }
659
660  /**
661   * Test addFilterAndArguments method of Import This method set couple parameters into
662   * Configuration
663   */
664  @Test
665  public void testAddFilterAndArguments() throws IOException {
666    Configuration configuration = new Configuration();
667
668    List<String> args = new ArrayList<>();
669    args.add("param1");
670    args.add("param2");
671
672    Import.addFilterAndArguments(configuration, FilterBase.class, args);
673    assertEquals("org.apache.hadoop.hbase.filter.FilterBase",
674      configuration.get(Import.FILTER_CLASS_CONF_KEY));
675    assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
676  }
677
678  @Test
679  public void testDurability() throws Throwable {
680    // Create an export table.
681    String exportTableName = name.getMethodName() + "export";
682    try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3)) {
683      // Insert some data
684      Put put = new Put(ROW1);
685      put.addColumn(FAMILYA, QUAL, now, QUAL);
686      put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
687      put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
688      exportTable.put(put);
689
690      put = new Put(ROW2);
691      put.addColumn(FAMILYA, QUAL, now, QUAL);
692      put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
693      put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
694      exportTable.put(put);
695
696      // Run the export
697      String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000" };
698      assertTrue(runExport(args));
699
700      // Create the table for import
701      String importTableName = name.getMethodName() + "import1";
702      Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
703
704      // Register the wal listener for the import table
705      RegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
706        .getRegions(importTable.getName()).get(0).getRegionInfo();
707      TableWALActionListener walListener = new TableWALActionListener(region);
708      WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
709      wal.registerWALActionsListener(walListener);
710
711      // Run the import with SKIP_WAL
712      args = new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
713        importTableName, FQ_OUTPUT_DIR };
714      assertTrue(runImport(args));
715      // Assert that the wal is not visisted
716      assertTrue(!walListener.isWALVisited());
717      // Ensure that the count is 2 (only one version of key value is obtained)
718      assertTrue(getCount(importTable, null) == 2);
719
720      // Run the import with the default durability option
721      importTableName = name.getMethodName() + "import2";
722      importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
723      region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
724        .getRegions(importTable.getName()).get(0).getRegionInfo();
725      wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
726      walListener = new TableWALActionListener(region);
727      wal.registerWALActionsListener(walListener);
728      args = new String[] { importTableName, FQ_OUTPUT_DIR };
729      assertTrue(runImport(args));
730      // Assert that the wal is visisted
731      assertTrue(walListener.isWALVisited());
732      // Ensure that the count is 2 (only one version of key value is obtained)
733      assertTrue(getCount(importTable, null) == 2);
734    }
735  }
736
737  /**
738   * This listens to the {@link #visitLogEntryBeforeWrite(RegionInfo, WALKey, WALEdit)} to identify
739   * that an entry is written to the Write Ahead Log for the given table.
740   */
741  private static class TableWALActionListener implements WALActionsListener {
742
743    private RegionInfo regionInfo;
744    private boolean isVisited = false;
745
746    public TableWALActionListener(RegionInfo region) {
747      this.regionInfo = region;
748    }
749
750    @Override
751    public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
752      if (
753        logKey.getTableName().getNameAsString()
754          .equalsIgnoreCase(this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit())
755      ) {
756        isVisited = true;
757      }
758    }
759
760    public boolean isWALVisited() {
761      return isVisited;
762    }
763  }
764
765  /**
766   * Add cell tags to delete mutations, run export and import tool and verify that tags are present
767   * in import table also.
768   * @throws Throwable throws Throwable.
769   */
770  @Test
771  public void testTagsAddition() throws Throwable {
772    final TableName exportTable = TableName.valueOf(name.getMethodName());
773    TableDescriptor desc = TableDescriptorBuilder.newBuilder(exportTable)
774      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
775        .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
776      .setCoprocessor(MetadataController.class.getName()).build();
777    UTIL.getAdmin().createTable(desc);
778
779    Table exportT = UTIL.getConnection().getTable(exportTable);
780
781    // Add first version of QUAL
782    Put p = new Put(ROW1);
783    p.addColumn(FAMILYA, QUAL, now, QUAL);
784    exportT.put(p);
785
786    // Add Delete family marker
787    Delete d = new Delete(ROW1, now + 3);
788    // Add test attribute to delete mutation.
789    d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG));
790    exportT.delete(d);
791
792    // Run export tool with KeyValueCodecWithTags as Codec. This will ensure that export tool
793    // will use KeyValueCodecWithTags.
794    String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true",
795      // This will make sure that codec will encode and decode tags in rpc call.
796      "-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags",
797      exportTable.getNameAsString(), FQ_OUTPUT_DIR, "1000", // max number of key versions per key to
798                                                            // export
799    };
800    assertTrue(runExport(args));
801    // Assert tag exists in exportTable
802    checkWhetherTagExists(exportTable, true);
803
804    // Create an import table with MetadataController.
805    final TableName importTable = TableName.valueOf("importWithTestTagsAddition");
806    TableDescriptor importTableDesc = TableDescriptorBuilder.newBuilder(importTable)
807      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
808        .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
809      .setCoprocessor(MetadataController.class.getName()).build();
810    UTIL.getAdmin().createTable(importTableDesc);
811
812    // Run import tool.
813    args = new String[] {
814      // This will make sure that codec will encode and decode tags in rpc call.
815      "-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags",
816      importTable.getNameAsString(), FQ_OUTPUT_DIR };
817    assertTrue(runImport(args));
818    // Make sure that tags exists in imported table.
819    checkWhetherTagExists(importTable, true);
820  }
821
822  private void checkWhetherTagExists(TableName table, boolean tagExists) throws IOException {
823    List<ExtendedCell> values = new ArrayList<>();
824    for (HRegion region : UTIL.getHBaseCluster().getRegions(table)) {
825      Scan scan = new Scan();
826      // Make sure to set rawScan to true so that we will get Delete Markers.
827      scan.setRaw(true);
828      scan.readAllVersions();
829      scan.withStartRow(ROW1);
830      // Need to use RegionScanner instead of table#getScanner since the latter will
831      // not return tags since it will go through rpc layer and remove tags intentionally.
832      RegionScanner scanner = region.getScanner(scan);
833      scanner.next(values);
834      if (!values.isEmpty()) {
835        break;
836      }
837    }
838    boolean deleteFound = false;
839    for (ExtendedCell cell : values) {
840      if (PrivateCellUtil.isDelete(cell.getType().getCode())) {
841        deleteFound = true;
842        List<Tag> tags = PrivateCellUtil.getTags(cell);
843        // If tagExists flag is true then validate whether tag contents are as expected.
844        if (tagExists) {
845          Assert.assertEquals(1, tags.size());
846          for (Tag tag : tags) {
847            Assert.assertEquals(TEST_TAG, Tag.getValueAsString(tag));
848          }
849        } else {
850          // If tagExists flag is disabled then check for 0 size tags.
851          assertEquals(0, tags.size());
852        }
853      }
854    }
855    Assert.assertTrue(deleteFound);
856  }
857
858  /*
859   * This co-proc will add a cell tag to delete mutation.
860   */
861  public static class MetadataController implements RegionCoprocessor, RegionObserver {
862    @Override
863    public Optional<RegionObserver> getRegionObserver() {
864      return Optional.of(this);
865    }
866
867    @Override
868    public void preBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c,
869      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
870      if (c.getEnvironment().getRegion().getRegionInfo().getTable().isSystemTable()) {
871        return;
872      }
873      for (int i = 0; i < miniBatchOp.size(); i++) {
874        Mutation m = miniBatchOp.getOperation(i);
875        if (!(m instanceof Delete)) {
876          continue;
877        }
878        byte[] sourceOpAttr = m.getAttribute(TEST_ATTR);
879        if (sourceOpAttr == null) {
880          continue;
881        }
882        Tag sourceOpTag = new ArrayBackedTag(TEST_TAG_TYPE, sourceOpAttr);
883        List<Cell> updatedCells = new ArrayList<>();
884        for (ExtendedCellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
885          ExtendedCell cell = cellScanner.current();
886          List<Tag> tags = PrivateCellUtil.getTags(cell);
887          tags.add(sourceOpTag);
888          Cell updatedCell = PrivateCellUtil.createCell(cell, tags);
889          updatedCells.add(updatedCell);
890        }
891        m.getFamilyCellMap().clear();
892        // Clear and add new Cells to the Mutation.
893        for (Cell cell : updatedCells) {
894          Delete d = (Delete) m;
895          d.add(cell);
896        }
897      }
898    }
899  }
900
901  /**
902   * Set hbase.client.rpc.codec and hbase.client.default.rpc.codec both to empty string This means
903   * it will use no Codec. Make sure that we don't return Tags in response.
904   * @throws Exception Exception
905   */
906  @Test
907  public void testTagsWithEmptyCodec() throws Exception {
908    TableName tableName = TableName.valueOf(name.getMethodName());
909    TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
910      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
911        .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
912      .setCoprocessor(MetadataController.class.getName()).build();
913    UTIL.getAdmin().createTable(tableDesc);
914    Configuration conf = new Configuration(UTIL.getConfiguration());
915    conf.set(RPC_CODEC_CONF_KEY, "");
916    conf.set(DEFAULT_CODEC_CLASS, "");
917    try (Connection connection = ConnectionFactory.createConnection(conf);
918      Table table = connection.getTable(tableName)) {
919      // Add first version of QUAL
920      Put p = new Put(ROW1);
921      p.addColumn(FAMILYA, QUAL, now, QUAL);
922      table.put(p);
923
924      // Add Delete family marker
925      Delete d = new Delete(ROW1, now + 3);
926      // Add test attribute to delete mutation.
927      d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG));
928      table.delete(d);
929
930      // Since RPC_CODEC_CONF_KEY and DEFAULT_CODEC_CLASS is set to empty, it will use
931      // empty Codec and it shouldn't encode/decode tags.
932      Scan scan = new Scan().withStartRow(ROW1).setRaw(true);
933      ResultScanner scanner = table.getScanner(scan);
934      int count = 0;
935      Result result;
936      while ((result = scanner.next()) != null) {
937        List<ExtendedCell> cells = Arrays.asList(ClientInternalHelper.getExtendedRawCells(result));
938        assertEquals(2, cells.size());
939        ExtendedCell cell = cells.get(0);
940        assertTrue(CellUtil.isDelete(cell));
941        List<Tag> tags = PrivateCellUtil.getTags(cell);
942        assertEquals(0, tags.size());
943        count++;
944      }
945      assertEquals(1, count);
946    } finally {
947      UTIL.deleteTable(tableName);
948    }
949  }
950}