001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY;
021import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026import static org.mockito.ArgumentMatchers.any;
027import static org.mockito.Mockito.doAnswer;
028import static org.mockito.Mockito.mock;
029import static org.mockito.Mockito.when;
030
031import java.io.ByteArrayOutputStream;
032import java.io.File;
033import java.io.IOException;
034import java.io.PrintStream;
035import java.net.URL;
036import java.util.ArrayList;
037import java.util.Arrays;
038import java.util.List;
039import java.util.Optional;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.ArrayBackedTag;
044import org.apache.hadoop.hbase.Cell;
045import org.apache.hadoop.hbase.CellScanner;
046import org.apache.hadoop.hbase.CellUtil;
047import org.apache.hadoop.hbase.HBaseClassTestRule;
048import org.apache.hadoop.hbase.HBaseTestingUtility;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.KeepDeletedCells;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.PrivateCellUtil;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.Tag;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
056import org.apache.hadoop.hbase.client.Connection;
057import org.apache.hadoop.hbase.client.ConnectionFactory;
058import org.apache.hadoop.hbase.client.Delete;
059import org.apache.hadoop.hbase.client.Durability;
060import org.apache.hadoop.hbase.client.Get;
061import org.apache.hadoop.hbase.client.Mutation;
062import org.apache.hadoop.hbase.client.Put;
063import org.apache.hadoop.hbase.client.RegionInfo;
064import org.apache.hadoop.hbase.client.Result;
065import org.apache.hadoop.hbase.client.ResultScanner;
066import org.apache.hadoop.hbase.client.Scan;
067import org.apache.hadoop.hbase.client.Table;
068import org.apache.hadoop.hbase.client.TableDescriptor;
069import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
070import org.apache.hadoop.hbase.coprocessor.ObserverContext;
071import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
072import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
073import org.apache.hadoop.hbase.coprocessor.RegionObserver;
074import org.apache.hadoop.hbase.filter.Filter;
075import org.apache.hadoop.hbase.filter.FilterBase;
076import org.apache.hadoop.hbase.filter.PrefixFilter;
077import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
078import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
079import org.apache.hadoop.hbase.regionserver.HRegion;
080import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
081import org.apache.hadoop.hbase.regionserver.RegionScanner;
082import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
083import org.apache.hadoop.hbase.testclassification.MediumTests;
084import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
085import org.apache.hadoop.hbase.util.Bytes;
086import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
087import org.apache.hadoop.hbase.util.LauncherSecurityManager;
088import org.apache.hadoop.hbase.wal.WAL;
089import org.apache.hadoop.hbase.wal.WALEdit;
090import org.apache.hadoop.hbase.wal.WALKey;
091import org.apache.hadoop.mapreduce.Mapper.Context;
092import org.apache.hadoop.util.GenericOptionsParser;
093import org.apache.hadoop.util.ToolRunner;
094import org.junit.After;
095import org.junit.AfterClass;
096import org.junit.Assert;
097import org.junit.Before;
098import org.junit.BeforeClass;
099import org.junit.ClassRule;
100import org.junit.Rule;
101import org.junit.Test;
102import org.junit.experimental.categories.Category;
103import org.junit.rules.TestName;
104import org.mockito.invocation.InvocationOnMock;
105import org.mockito.stubbing.Answer;
106import org.slf4j.Logger;
107import org.slf4j.LoggerFactory;
108
109/**
110 * Tests the table import and table export MR job functionality
111 */
112@Category({ VerySlowMapReduceTests.class, MediumTests.class })
113// TODO : Remove this in 3.0
114public class TestImportExport {
115
116  @ClassRule
117  public static final HBaseClassTestRule CLASS_RULE =
118    HBaseClassTestRule.forClass(TestImportExport.class);
119
120  private static final Logger LOG = LoggerFactory.getLogger(TestImportExport.class);
121  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
122  private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1");
123  private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2");
124  private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3");
125  private static final String FAMILYA_STRING = "a";
126  private static final String FAMILYB_STRING = "b";
127  private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
128  private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
129  private static final byte[] QUAL = Bytes.toBytes("q");
130  private static final String OUTPUT_DIR = "outputdir";
131  private static String FQ_OUTPUT_DIR;
132  private static final String EXPORT_BATCH_SIZE = "100";
133
134  private static final long now = EnvironmentEdgeManager.currentTime();
135  private final TableName EXPORT_TABLE = TableName.valueOf("export_table");
136  private final TableName IMPORT_TABLE = TableName.valueOf("import_table");
137  public static final byte TEST_TAG_TYPE = (byte) (Tag.CUSTOM_TAG_TYPE_RANGE + 1);
138  public static final String TEST_ATTR = "source_op";
139  public static final String TEST_TAG = "test_tag";
140
141  @BeforeClass
142  public static void beforeClass() throws Throwable {
143    // Up the handlers; this test needs more than usual.
144    UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
145    UTIL.startMiniCluster();
146    FQ_OUTPUT_DIR =
147      new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
148  }
149
150  @AfterClass
151  public static void afterClass() throws Throwable {
152    UTIL.shutdownMiniCluster();
153  }
154
155  @Rule
156  public final TestName name = new TestName();
157
158  @Before
159  public void announce() {
160    LOG.info("Running " + name.getMethodName());
161  }
162
163  @After
164  public void cleanup() throws Throwable {
165    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
166    fs.delete(new Path(OUTPUT_DIR), true);
167    if (UTIL.getAdmin().tableExists(EXPORT_TABLE)) {
168      UTIL.deleteTable(EXPORT_TABLE);
169    }
170    if (UTIL.getAdmin().tableExists(IMPORT_TABLE)) {
171      UTIL.deleteTable(IMPORT_TABLE);
172    }
173  }
174
175  /**
176   * Runs an export job with the specified command line args
177   * @return true if job completed successfully
178   */
179  protected boolean runExport(String[] args) throws Throwable {
180    // need to make a copy of the configuration because to make sure different temp dirs are used.
181    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args);
182    return status == 0;
183  }
184
185  protected void runExportMain(String[] args) throws Throwable {
186    Export.main(args);
187  }
188
189  /**
190   * Runs an import job with the specified command line args
191   * @return true if job completed successfully
192   */
193  boolean runImport(String[] args) throws Throwable {
194    // need to make a copy of the configuration because to make sure different temp dirs are used.
195    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args);
196    return status == 0;
197  }
198
199  /**
200   * Test simple replication case with column mapping
201   */
202  @Test
203  public void testSimpleCase() throws Throwable {
204    try (Table t = UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILYA, 3);) {
205      Put p = new Put(ROW1);
206      p.addColumn(FAMILYA, QUAL, now, QUAL);
207      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
208      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
209      t.put(p);
210      p = new Put(ROW2);
211      p.addColumn(FAMILYA, QUAL, now, QUAL);
212      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
213      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
214      t.put(p);
215      p = new Put(ROW3);
216      p.addColumn(FAMILYA, QUAL, now, QUAL);
217      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
218      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
219      t.put(p);
220    }
221
222    String[] args = new String[] {
223      // Only export row1 & row2.
224      "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1",
225      "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3", name.getMethodName(), FQ_OUTPUT_DIR,
226      "1000", // max number of key versions per key to export
227    };
228    assertTrue(runExport(args));
229
230    final String IMPORT_TABLE = name.getMethodName() + "import";
231    try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3);) {
232      args =
233        new String[] { "-D" + Import.CF_RENAME_PROP + "=" + FAMILYA_STRING + ":" + FAMILYB_STRING,
234          IMPORT_TABLE, FQ_OUTPUT_DIR };
235      assertTrue(runImport(args));
236
237      Get g = new Get(ROW1);
238      g.setMaxVersions();
239      Result r = t.get(g);
240      assertEquals(3, r.size());
241      g = new Get(ROW2);
242      g.setMaxVersions();
243      r = t.get(g);
244      assertEquals(3, r.size());
245      g = new Get(ROW3);
246      r = t.get(g);
247      assertEquals(0, r.size());
248    }
249  }
250
251  /**
252   * Test export hbase:meta table
253   */
254  @Test
255  public void testMetaExport() throws Throwable {
256    String[] args =
257      new String[] { TableName.META_TABLE_NAME.getNameAsString(), FQ_OUTPUT_DIR, "1", "0", "0" };
258    assertTrue(runExport(args));
259  }
260
261  /**
262   * Test import data from 0.94 exported file
263   */
264  @Test
265  public void testImport94Table() throws Throwable {
266    final String name = "exportedTableIn94Format";
267    URL url = TestImportExport.class.getResource(name);
268    File f = new File(url.toURI());
269    if (!f.exists()) {
270      LOG.warn("FAILED TO FIND " + f + "; skipping out on test");
271      return;
272    }
273    assertTrue(f.exists());
274    LOG.info("FILE=" + f);
275    Path importPath = new Path(f.toURI());
276    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
277    fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name));
278    String IMPORT_TABLE = name;
279    try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3);) {
280      String[] args = new String[] { "-Dhbase.import.version=0.94", IMPORT_TABLE, FQ_OUTPUT_DIR };
281      assertTrue(runImport(args));
282      // @formatter:off
283      // exportedTableIn94Format contains 5 rows
284      // ROW         COLUMN+CELL
285      // r1          column=f1:c1, timestamp=1383766761171, value=val1
286      // r2          column=f1:c1, timestamp=1383766771642, value=val2
287      // r3          column=f1:c1, timestamp=1383766777615, value=val3
288      // r4          column=f1:c1, timestamp=1383766785146, value=val4
289      // r5          column=f1:c1, timestamp=1383766791506, value=val5
290      // @formatter:on
291      assertEquals(5, UTIL.countRows(t));
292    }
293  }
294
295  /**
296   * Test export scanner batching
297   */
298  @Test
299  public void testExportScannerBatching() throws Throwable {
300    TableDescriptor desc = TableDescriptorBuilder
301      .newBuilder(TableName.valueOf(name.getMethodName()))
302      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(1).build())
303      .build();
304    UTIL.getAdmin().createTable(desc);
305    try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
306
307      Put p = new Put(ROW1);
308      p.addColumn(FAMILYA, QUAL, now, QUAL);
309      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
310      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
311      p.addColumn(FAMILYA, QUAL, now + 3, QUAL);
312      p.addColumn(FAMILYA, QUAL, now + 4, QUAL);
313      t.put(p);
314      // added scanner batching arg.
315      String[] args = new String[] { "-D" + ExportUtils.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE,
316        name.getMethodName(), FQ_OUTPUT_DIR };
317      assertTrue(runExport(args));
318
319      FileSystem fs = FileSystem.get(UTIL.getConfiguration());
320      fs.delete(new Path(FQ_OUTPUT_DIR), true);
321    }
322  }
323
324  @Test
325  public void testWithDeletes() throws Throwable {
326    TableDescriptor desc =
327      TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
328        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
329          .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
330        .build();
331    UTIL.getAdmin().createTable(desc);
332    try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
333
334      Put p = new Put(ROW1);
335      p.addColumn(FAMILYA, QUAL, now, QUAL);
336      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
337      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
338      p.addColumn(FAMILYA, QUAL, now + 3, QUAL);
339      p.addColumn(FAMILYA, QUAL, now + 4, QUAL);
340      t.put(p);
341
342      Delete d = new Delete(ROW1, now + 3);
343      t.delete(d);
344      d = new Delete(ROW1);
345      d.addColumns(FAMILYA, QUAL, now + 2);
346      t.delete(d);
347    }
348
349    String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true", name.getMethodName(),
350      FQ_OUTPUT_DIR, "1000", // max number of key versions per key to export
351    };
352    assertTrue(runExport(args));
353
354    final String IMPORT_TABLE = name.getMethodName() + "import";
355    desc = TableDescriptorBuilder
356      .newBuilder(TableName.valueOf(IMPORT_TABLE)).setColumnFamily(ColumnFamilyDescriptorBuilder
357        .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build())
358      .build();
359    UTIL.getAdmin().createTable(desc);
360    try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
361      args = new String[] { IMPORT_TABLE, FQ_OUTPUT_DIR };
362      assertTrue(runImport(args));
363
364      Scan s = new Scan();
365      s.setMaxVersions();
366      s.setRaw(true);
367      ResultScanner scanner = t.getScanner(s);
368      Result r = scanner.next();
369      Cell[] res = r.rawCells();
370      assertTrue(PrivateCellUtil.isDeleteFamily(res[0]));
371      assertEquals(now + 4, res[1].getTimestamp());
372      assertEquals(now + 3, res[2].getTimestamp());
373      assertTrue(CellUtil.isDelete(res[3]));
374      assertEquals(now + 2, res[4].getTimestamp());
375      assertEquals(now + 1, res[5].getTimestamp());
376      assertEquals(now, res[6].getTimestamp());
377    }
378  }
379
380  @Test
381  public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Throwable {
382    final TableName exportTable = TableName.valueOf(name.getMethodName());
383    TableDescriptor desc =
384      TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
385        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
386          .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
387        .build();
388    UTIL.getAdmin().createTable(desc);
389
390    Table exportT = UTIL.getConnection().getTable(exportTable);
391
392    // Add first version of QUAL
393    Put p = new Put(ROW1);
394    p.addColumn(FAMILYA, QUAL, now, QUAL);
395    exportT.put(p);
396
397    // Add Delete family marker
398    Delete d = new Delete(ROW1, now + 3);
399    exportT.delete(d);
400
401    // Add second version of QUAL
402    p = new Put(ROW1);
403    p.addColumn(FAMILYA, QUAL, now + 5, "s".getBytes());
404    exportT.put(p);
405
406    // Add second Delete family marker
407    d = new Delete(ROW1, now + 7);
408    exportT.delete(d);
409
410    String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true",
411      exportTable.getNameAsString(), FQ_OUTPUT_DIR, "1000", // max number of key versions per key to
412                                                            // export
413    };
414    assertTrue(runExport(args));
415
416    final String importTable = name.getMethodName() + "import";
417    desc = TableDescriptorBuilder
418      .newBuilder(TableName.valueOf(importTable)).setColumnFamily(ColumnFamilyDescriptorBuilder
419        .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build())
420      .build();
421    UTIL.getAdmin().createTable(desc);
422
423    Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable));
424    args = new String[] { importTable, FQ_OUTPUT_DIR };
425    assertTrue(runImport(args));
426
427    Scan s = new Scan();
428    s.setMaxVersions();
429    s.setRaw(true);
430
431    ResultScanner importedTScanner = importT.getScanner(s);
432    Result importedTResult = importedTScanner.next();
433
434    ResultScanner exportedTScanner = exportT.getScanner(s);
435    Result exportedTResult = exportedTScanner.next();
436    try {
437      Result.compareResults(exportedTResult, importedTResult);
438    } catch (Throwable e) {
439      fail("Original and imported tables data comparision failed with error:" + e.getMessage());
440    } finally {
441      exportT.close();
442      importT.close();
443    }
444  }
445
446  /**
447   * Create a simple table, run an Export Job on it, Import with filtering on, verify counts,
448   * attempt with invalid values.
449   */
450  @Test
451  public void testWithFilter() throws Throwable {
452    // Create simple table to export
453    TableDescriptor desc = TableDescriptorBuilder
454      .newBuilder(TableName.valueOf(name.getMethodName()))
455      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
456      .build();
457    UTIL.getAdmin().createTable(desc);
458    Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
459
460    Put p1 = new Put(ROW1);
461    p1.addColumn(FAMILYA, QUAL, now, QUAL);
462    p1.addColumn(FAMILYA, QUAL, now + 1, QUAL);
463    p1.addColumn(FAMILYA, QUAL, now + 2, QUAL);
464    p1.addColumn(FAMILYA, QUAL, now + 3, QUAL);
465    p1.addColumn(FAMILYA, QUAL, now + 4, QUAL);
466
467    // Having another row would actually test the filter.
468    Put p2 = new Put(ROW2);
469    p2.addColumn(FAMILYA, QUAL, now, QUAL);
470
471    exportTable.put(Arrays.asList(p1, p2));
472
473    // Export the simple table
474    String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" };
475    assertTrue(runExport(args));
476
477    // Import to a new table
478    final String IMPORT_TABLE = name.getMethodName() + "import";
479    desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(IMPORT_TABLE))
480      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
481      .build();
482    UTIL.getAdmin().createTable(desc);
483
484    Table importTable = UTIL.getConnection().getTable(desc.getTableName());
485    args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
486      "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR,
487      "1000" };
488    assertTrue(runImport(args));
489
490    // get the count of the source table for that time range
491    PrefixFilter filter = new PrefixFilter(ROW1);
492    int count = getCount(exportTable, filter);
493
494    Assert.assertEquals("Unexpected row count between export and import tables", count,
495      getCount(importTable, null));
496
497    // and then test that a broken command doesn't bork everything - easier here because we don't
498    // need to re-run the export job
499
500    args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
501      "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", name.getMethodName(),
502      FQ_OUTPUT_DIR, "1000" };
503    assertFalse(runImport(args));
504
505    // cleanup
506    exportTable.close();
507    importTable.close();
508  }
509
510  /**
511   * Count the number of keyvalues in the specified table with the given filter
512   * @param table the table to scan
513   * @return the number of keyvalues found
514   */
515  private int getCount(Table table, Filter filter) throws IOException {
516    Scan scan = new Scan();
517    scan.setFilter(filter);
518    ResultScanner results = table.getScanner(scan);
519    int count = 0;
520    for (Result res : results) {
521      count += res.size();
522    }
523    results.close();
524    return count;
525  }
526
527  /**
528   * test main method. Import should print help and call System.exit
529   */
530  @Test
531  public void testImportMain() throws Throwable {
532    PrintStream oldPrintStream = System.err;
533    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
534    LauncherSecurityManager newSecurityManager = new LauncherSecurityManager();
535    System.setSecurityManager(newSecurityManager);
536    ByteArrayOutputStream data = new ByteArrayOutputStream();
537    String[] args = {};
538    System.setErr(new PrintStream(data));
539    try {
540      System.setErr(new PrintStream(data));
541      Import.main(args);
542      fail("should be SecurityException");
543    } catch (SecurityException e) {
544      assertEquals(-1, newSecurityManager.getExitCode());
545      assertTrue(data.toString().contains("Wrong number of arguments:"));
546      assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
547      assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>"));
548      assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
549      assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false"));
550    } finally {
551      System.setErr(oldPrintStream);
552      System.setSecurityManager(SECURITY_MANAGER);
553    }
554  }
555
556  @Test
557  public void testExportScan() throws Exception {
558    int version = 100;
559    long startTime = EnvironmentEdgeManager.currentTime();
560    long endTime = startTime + 1;
561    String prefix = "row";
562    String label_0 = "label_0";
563    String label_1 = "label_1";
564    String[] args = { "table", "outputDir", String.valueOf(version), String.valueOf(startTime),
565      String.valueOf(endTime), prefix };
566    Scan scan = ExportUtils.getScanFromCommandLine(UTIL.getConfiguration(), args);
567    assertEquals(version, scan.getMaxVersions());
568    assertEquals(startTime, scan.getTimeRange().getMin());
569    assertEquals(endTime, scan.getTimeRange().getMax());
570    assertEquals(true, (scan.getFilter() instanceof PrefixFilter));
571    assertEquals(0,
572      Bytes.compareTo(((PrefixFilter) scan.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix)));
573    String[] argsWithLabels =
574      { "-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + label_0 + "," + label_1, "table",
575        "outputDir", String.valueOf(version), String.valueOf(startTime), String.valueOf(endTime),
576        prefix };
577    Configuration conf = new Configuration(UTIL.getConfiguration());
578    // parse the "-D" options
579    String[] otherArgs = new GenericOptionsParser(conf, argsWithLabels).getRemainingArgs();
580    Scan scanWithLabels = ExportUtils.getScanFromCommandLine(conf, otherArgs);
581    assertEquals(version, scanWithLabels.getMaxVersions());
582    assertEquals(startTime, scanWithLabels.getTimeRange().getMin());
583    assertEquals(endTime, scanWithLabels.getTimeRange().getMax());
584    assertEquals(true, (scanWithLabels.getFilter() instanceof PrefixFilter));
585    assertEquals(0, Bytes.compareTo(((PrefixFilter) scanWithLabels.getFilter()).getPrefix(),
586      Bytes.toBytesBinary(prefix)));
587    assertEquals(2, scanWithLabels.getAuthorizations().getLabels().size());
588    assertEquals(label_0, scanWithLabels.getAuthorizations().getLabels().get(0));
589    assertEquals(label_1, scanWithLabels.getAuthorizations().getLabels().get(1));
590  }
591
592  /**
593   * test main method. Export should print help and call System.exit
594   */
595  @Test
596  public void testExportMain() throws Throwable {
597    PrintStream oldPrintStream = System.err;
598    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
599    LauncherSecurityManager newSecurityManager = new LauncherSecurityManager();
600    System.setSecurityManager(newSecurityManager);
601    ByteArrayOutputStream data = new ByteArrayOutputStream();
602    String[] args = {};
603    System.setErr(new PrintStream(data));
604    try {
605      System.setErr(new PrintStream(data));
606      runExportMain(args);
607      fail("should be SecurityException");
608    } catch (SecurityException e) {
609      assertEquals(-1, newSecurityManager.getExitCode());
610      String errMsg = data.toString();
611      assertTrue(errMsg.contains("Wrong number of arguments:"));
612      assertTrue(
613        errMsg.contains("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> "
614          + "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]"));
615      assertTrue(errMsg.contains("-D hbase.mapreduce.scan.column.family=<family1>,<family2>, ..."));
616      assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true"));
617      assertTrue(errMsg.contains("-D hbase.client.scanner.caching=100"));
618      assertTrue(errMsg.contains("-D hbase.export.scanner.batch=10"));
619      assertTrue(errMsg.contains("-D hbase.export.scanner.caching=100"));
620    } finally {
621      System.setErr(oldPrintStream);
622      System.setSecurityManager(SECURITY_MANAGER);
623    }
624  }
625
626  /**
627   * Test map method of Importer
628   */
629  @SuppressWarnings({ "unchecked", "rawtypes" })
630  @Test
631  public void testKeyValueImporter() throws Throwable {
632    KeyValueImporter importer = new KeyValueImporter();
633    Configuration configuration = new Configuration();
634    Context ctx = mock(Context.class);
635    when(ctx.getConfiguration()).thenReturn(configuration);
636
637    doAnswer(new Answer<Void>() {
638
639      @Override
640      public Void answer(InvocationOnMock invocation) throws Throwable {
641        ImmutableBytesWritable writer = invocation.getArgument(0);
642        KeyValue key = invocation.getArgument(1);
643        assertEquals("Key", Bytes.toString(writer.get()));
644        assertEquals("row", Bytes.toString(CellUtil.cloneRow(key)));
645        return null;
646      }
647    }).when(ctx).write(any(), any());
648
649    importer.setup(ctx);
650    Result value = mock(Result.class);
651    KeyValue[] keys = {
652      new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
653        Bytes.toBytes("value")),
654      new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
655        Bytes.toBytes("value1")) };
656    when(value.rawCells()).thenReturn(keys);
657    importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx);
658
659  }
660
661  /**
662   * Test addFilterAndArguments method of Import This method set couple parameters into
663   * Configuration
664   */
665  @Test
666  public void testAddFilterAndArguments() throws IOException {
667    Configuration configuration = new Configuration();
668
669    List<String> args = new ArrayList<>();
670    args.add("param1");
671    args.add("param2");
672
673    Import.addFilterAndArguments(configuration, FilterBase.class, args);
674    assertEquals("org.apache.hadoop.hbase.filter.FilterBase",
675      configuration.get(Import.FILTER_CLASS_CONF_KEY));
676    assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
677  }
678
679  @Test
680  public void testDurability() throws Throwable {
681    // Create an export table.
682    String exportTableName = name.getMethodName() + "export";
683    try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) {
684
685      // Insert some data
686      Put put = new Put(ROW1);
687      put.addColumn(FAMILYA, QUAL, now, QUAL);
688      put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
689      put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
690      exportTable.put(put);
691
692      put = new Put(ROW2);
693      put.addColumn(FAMILYA, QUAL, now, QUAL);
694      put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
695      put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
696      exportTable.put(put);
697
698      // Run the export
699      String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000" };
700      assertTrue(runExport(args));
701
702      // Create the table for import
703      String importTableName = name.getMethodName() + "import1";
704      Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
705
706      // Register the wal listener for the import table
707      RegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
708        .getRegions(importTable.getName()).get(0).getRegionInfo();
709      TableWALActionListener walListener = new TableWALActionListener(region);
710      WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
711      wal.registerWALActionsListener(walListener);
712
713      // Run the import with SKIP_WAL
714      args = new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
715        importTableName, FQ_OUTPUT_DIR };
716      assertTrue(runImport(args));
717      // Assert that the wal is not visisted
718      assertTrue(!walListener.isWALVisited());
719      // Ensure that the count is 2 (only one version of key value is obtained)
720      assertTrue(getCount(importTable, null) == 2);
721
722      // Run the import with the default durability option
723      importTableName = name.getMethodName() + "import2";
724      importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
725      region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
726        .getRegions(importTable.getName()).get(0).getRegionInfo();
727      wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
728      walListener = new TableWALActionListener(region);
729      wal.registerWALActionsListener(walListener);
730      args = new String[] { importTableName, FQ_OUTPUT_DIR };
731      assertTrue(runImport(args));
732      // Assert that the wal is visisted
733      assertTrue(walListener.isWALVisited());
734      // Ensure that the count is 2 (only one version of key value is obtained)
735      assertTrue(getCount(importTable, null) == 2);
736    }
737  }
738
739  /**
740   * This listens to the {@link #visitLogEntryBeforeWrite(RegionInfo, WALKey, WALEdit)} to identify
741   * that an entry is written to the Write Ahead Log for the given table.
742   */
743  private static class TableWALActionListener implements WALActionsListener {
744
745    private RegionInfo regionInfo;
746    private boolean isVisited = false;
747
748    public TableWALActionListener(RegionInfo region) {
749      this.regionInfo = region;
750    }
751
752    @Override
753    public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
754      if (
755        logKey.getTableName().getNameAsString()
756          .equalsIgnoreCase(this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit())
757      ) {
758        isVisited = true;
759      }
760    }
761
762    public boolean isWALVisited() {
763      return isVisited;
764    }
765  }
766
767  /**
768   * Add cell tags to delete mutations, run export and import tool and verify that tags are present
769   * in import table also.
770   * @throws Throwable throws Throwable.
771   */
772  @Test
773  public void testTagsAddition() throws Throwable {
774    final TableName exportTable = TableName.valueOf(name.getMethodName());
775    TableDescriptor desc = TableDescriptorBuilder.newBuilder(exportTable)
776      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
777        .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
778      .setCoprocessor(MetadataController.class.getName()).build();
779    UTIL.getAdmin().createTable(desc);
780
781    Table exportT = UTIL.getConnection().getTable(exportTable);
782
783    // Add first version of QUAL
784    Put p = new Put(ROW1);
785    p.addColumn(FAMILYA, QUAL, now, QUAL);
786    exportT.put(p);
787
788    // Add Delete family marker
789    Delete d = new Delete(ROW1, now + 3);
790    // Add test attribute to delete mutation.
791    d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG));
792    exportT.delete(d);
793
794    // Run export tool with KeyValueCodecWithTags as Codec. This will ensure that export tool
795    // will use KeyValueCodecWithTags.
796    String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true",
797      // This will make sure that codec will encode and decode tags in rpc call.
798      "-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags",
799      exportTable.getNameAsString(), FQ_OUTPUT_DIR, "1000", // max number of key versions per key to
800                                                            // export
801    };
802    assertTrue(runExport(args));
803    // Assert tag exists in exportTable
804    checkWhetherTagExists(exportTable, true);
805
806    // Create an import table with MetadataController.
807    final TableName importTable = TableName.valueOf("importWithTestTagsAddition");
808    TableDescriptor importTableDesc = TableDescriptorBuilder.newBuilder(importTable)
809      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
810        .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
811      .setCoprocessor(MetadataController.class.getName()).build();
812    UTIL.getAdmin().createTable(importTableDesc);
813
814    // Run import tool.
815    args = new String[] {
816      // This will make sure that codec will encode and decode tags in rpc call.
817      "-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags",
818      importTable.getNameAsString(), FQ_OUTPUT_DIR };
819    assertTrue(runImport(args));
820    // Make sure that tags exists in imported table.
821    checkWhetherTagExists(importTable, true);
822  }
823
824  private void checkWhetherTagExists(TableName table, boolean tagExists) throws IOException {
825    List<Cell> values = new ArrayList<>();
826    for (HRegion region : UTIL.getHBaseCluster().getRegions(table)) {
827      Scan scan = new Scan();
828      // Make sure to set rawScan to true so that we will get Delete Markers.
829      scan.setRaw(true);
830      scan.readAllVersions();
831      scan.withStartRow(ROW1);
832      // Need to use RegionScanner instead of table#getScanner since the latter will
833      // not return tags since it will go through rpc layer and remove tags intentionally.
834      RegionScanner scanner = region.getScanner(scan);
835      scanner.next(values);
836      if (!values.isEmpty()) {
837        break;
838      }
839    }
840    boolean deleteFound = false;
841    for (Cell cell : values) {
842      if (PrivateCellUtil.isDelete(cell.getType().getCode())) {
843        deleteFound = true;
844        List<Tag> tags = PrivateCellUtil.getTags(cell);
845        // If tagExists flag is true then validate whether tag contents are as expected.
846        if (tagExists) {
847          Assert.assertEquals(1, tags.size());
848          for (Tag tag : tags) {
849            Assert.assertEquals(TEST_TAG, Tag.getValueAsString(tag));
850          }
851        } else {
852          // If tagExists flag is disabled then check for 0 size tags.
853          assertEquals(0, tags.size());
854        }
855      }
856    }
857    Assert.assertTrue(deleteFound);
858  }
859
860  /*
861   * This co-proc will add a cell tag to delete mutation.
862   */
863  public static class MetadataController implements RegionCoprocessor, RegionObserver {
864    @Override
865    public Optional<RegionObserver> getRegionObserver() {
866      return Optional.of(this);
867    }
868
869    @Override
870    public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
871      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
872      if (c.getEnvironment().getRegion().getRegionInfo().getTable().isSystemTable()) {
873        return;
874      }
875      for (int i = 0; i < miniBatchOp.size(); i++) {
876        Mutation m = miniBatchOp.getOperation(i);
877        if (!(m instanceof Delete)) {
878          continue;
879        }
880        byte[] sourceOpAttr = m.getAttribute(TEST_ATTR);
881        if (sourceOpAttr == null) {
882          continue;
883        }
884        Tag sourceOpTag = new ArrayBackedTag(TEST_TAG_TYPE, sourceOpAttr);
885        List<Cell> updatedCells = new ArrayList<>();
886        for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
887          Cell cell = cellScanner.current();
888          List<Tag> tags = PrivateCellUtil.getTags(cell);
889          tags.add(sourceOpTag);
890          Cell updatedCell = PrivateCellUtil.createCell(cell, tags);
891          updatedCells.add(updatedCell);
892        }
893        m.getFamilyCellMap().clear();
894        // Clear and add new Cells to the Mutation.
895        for (Cell cell : updatedCells) {
896          Delete d = (Delete) m;
897          d.add(cell);
898        }
899      }
900    }
901  }
902
903  /**
904   * Set hbase.client.rpc.codec and hbase.client.default.rpc.codec both to empty string This means
905   * it will use no Codec. Make sure that we don't return Tags in response.
906   * @throws Exception Exception
907   */
908  @Test
909  public void testTagsWithEmptyCodec() throws Exception {
910    TableName tableName = TableName.valueOf(name.getMethodName());
911    TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
912      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
913        .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
914      .setCoprocessor(MetadataController.class.getName()).build();
915    UTIL.getAdmin().createTable(tableDesc);
916    Configuration conf = new Configuration(UTIL.getConfiguration());
917    conf.set(RPC_CODEC_CONF_KEY, "");
918    conf.set(DEFAULT_CODEC_CLASS, "");
919    try (Connection connection = ConnectionFactory.createConnection(conf);
920      Table table = connection.getTable(tableName)) {
921      // Add first version of QUAL
922      Put p = new Put(ROW1);
923      p.addColumn(FAMILYA, QUAL, now, QUAL);
924      table.put(p);
925
926      // Add Delete family marker
927      Delete d = new Delete(ROW1, now + 3);
928      // Add test attribute to delete mutation.
929      d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG));
930      table.delete(d);
931
932      // Since RPC_CODEC_CONF_KEY and DEFAULT_CODEC_CLASS is set to empty, it will use
933      // empty Codec and it shouldn't encode/decode tags.
934      Scan scan = new Scan().withStartRow(ROW1).setRaw(true);
935      ResultScanner scanner = table.getScanner(scan);
936      int count = 0;
937      Result result;
938      while ((result = scanner.next()) != null) {
939        List<Cell> cells = result.listCells();
940        assertEquals(2, cells.size());
941        Cell cell = cells.get(0);
942        assertTrue(CellUtil.isDelete(cell));
943        List<Tag> tags = PrivateCellUtil.getTags(cell);
944        assertEquals(0, tags.size());
945        count++;
946      }
947      assertEquals(1, count);
948    } finally {
949      UTIL.deleteTable(tableName);
950    }
951  }
952}