001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.Mockito.doAnswer;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.when;
028
029import java.io.ByteArrayOutputStream;
030import java.io.File;
031import java.io.IOException;
032import java.io.PrintStream;
033import java.net.URL;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.List;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.CellUtil;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseTestingUtility;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.KeepDeletedCells;
046import org.apache.hadoop.hbase.KeyValue;
047import org.apache.hadoop.hbase.PrivateCellUtil;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.client.Delete;
051import org.apache.hadoop.hbase.client.Durability;
052import org.apache.hadoop.hbase.client.Get;
053import org.apache.hadoop.hbase.client.Put;
054import org.apache.hadoop.hbase.client.RegionInfo;
055import org.apache.hadoop.hbase.client.Result;
056import org.apache.hadoop.hbase.client.ResultScanner;
057import org.apache.hadoop.hbase.client.Scan;
058import org.apache.hadoop.hbase.client.Table;
059import org.apache.hadoop.hbase.client.TableDescriptor;
060import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
061import org.apache.hadoop.hbase.filter.Filter;
062import org.apache.hadoop.hbase.filter.FilterBase;
063import org.apache.hadoop.hbase.filter.PrefixFilter;
064import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
065import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
066import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
067import org.apache.hadoop.hbase.testclassification.MediumTests;
068import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
071import org.apache.hadoop.hbase.util.LauncherSecurityManager;
072import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
073import org.apache.hadoop.hbase.wal.WAL;
074import org.apache.hadoop.hbase.wal.WALEdit;
075import org.apache.hadoop.hbase.wal.WALKey;
076import org.apache.hadoop.mapreduce.Mapper.Context;
077import org.apache.hadoop.util.GenericOptionsParser;
078import org.apache.hadoop.util.ToolRunner;
079import org.junit.After;
080import org.junit.AfterClass;
081import org.junit.Assert;
082import org.junit.Before;
083import org.junit.BeforeClass;
084import org.junit.ClassRule;
085import org.junit.Rule;
086import org.junit.Test;
087import org.junit.experimental.categories.Category;
088import org.junit.rules.TestName;
089import org.mockito.invocation.InvocationOnMock;
090import org.mockito.stubbing.Answer;
091import org.slf4j.Logger;
092import org.slf4j.LoggerFactory;
093
094/**
095 * Tests the table import and table export MR job functionality
096 */
097@Category({ VerySlowMapReduceTests.class, MediumTests.class })
098public class TestCellBasedImportExport2 {
099
100  @ClassRule
101  public static final HBaseClassTestRule CLASS_RULE =
102    HBaseClassTestRule.forClass(TestCellBasedImportExport2.class);
103
104  private static final Logger LOG = LoggerFactory.getLogger(TestCellBasedImportExport2.class);
105  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
106  private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1");
107  private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2");
108  private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3");
109  private static final String FAMILYA_STRING = "a";
110  private static final String FAMILYB_STRING = "b";
111  private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
112  private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
113  private static final byte[] QUAL = Bytes.toBytes("q");
114  private static final String OUTPUT_DIR = "outputdir";
115  private static String FQ_OUTPUT_DIR;
116  private static final String EXPORT_BATCH_SIZE = "100";
117
118  private static final long now = EnvironmentEdgeManager.currentTime();
119  private final TableName EXPORT_TABLE = TableName.valueOf("export_table");
120  private final TableName IMPORT_TABLE = TableName.valueOf("import_table");
121
122  @BeforeClass
123  public static void beforeClass() throws Throwable {
124    // Up the handlers; this test needs more than usual.
125    UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
126    UTIL.startMiniCluster();
127    FQ_OUTPUT_DIR =
128      new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
129  }
130
131  @AfterClass
132  public static void afterClass() throws Throwable {
133    UTIL.shutdownMiniCluster();
134  }
135
136  @Rule
137  public final TestName name = new TestName();
138
139  @Before
140  public void announce() {
141    LOG.info("Running " + name.getMethodName());
142  }
143
144  @After
145  public void cleanup() throws Throwable {
146    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
147    fs.delete(new Path(OUTPUT_DIR), true);
148    if (UTIL.getAdmin().tableExists(EXPORT_TABLE)) {
149      UTIL.deleteTable(EXPORT_TABLE);
150    }
151    if (UTIL.getAdmin().tableExists(IMPORT_TABLE)) {
152      UTIL.deleteTable(IMPORT_TABLE);
153    }
154  }
155
156  /**
157   * Runs an export job with the specified command line args
158   * @return true if job completed successfully
159   */
160  protected boolean runExport(String[] args) throws Throwable {
161    // need to make a copy of the configuration because to make sure different temp dirs are used.
162    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args);
163    return status == 0;
164  }
165
166  protected void runExportMain(String[] args) throws Throwable {
167    Export.main(args);
168  }
169
170  /**
171   * Runs an import job with the specified command line args
172   * @return true if job completed successfully
173   */
174  boolean runImport(String[] args) throws Throwable {
175    // need to make a copy of the configuration because to make sure different temp dirs are used.
176    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args);
177    return status == 0;
178  }
179
180  /**
181   * Test simple replication case with column mapping
182   */
183  @Test
184  public void testSimpleCase() throws Throwable {
185    try (Table t = UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILYA, 3);) {
186      Put p = new Put(ROW1);
187      p.addColumn(FAMILYA, QUAL, now, QUAL);
188      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
189      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
190      t.put(p);
191      p = new Put(ROW2);
192      p.addColumn(FAMILYA, QUAL, now, QUAL);
193      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
194      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
195      t.put(p);
196      p = new Put(ROW3);
197      p.addColumn(FAMILYA, QUAL, now, QUAL);
198      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
199      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
200      t.put(p);
201    }
202
203    String[] args = new String[] {
204      // Only export row1 & row2.
205      "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1",
206      "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3", name.getMethodName(), FQ_OUTPUT_DIR,
207      "1000", // max number of key versions per key to export
208    };
209    assertTrue(runExport(args));
210
211    final String IMPORT_TABLE = name.getMethodName() + "import";
212    try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3);) {
213      args =
214        new String[] { "-D" + Import.CF_RENAME_PROP + "=" + FAMILYA_STRING + ":" + FAMILYB_STRING,
215          IMPORT_TABLE, FQ_OUTPUT_DIR };
216      assertTrue(runImport(args));
217
218      Get g = new Get(ROW1);
219      g.setMaxVersions();
220      Result r = t.get(g);
221      assertEquals(3, r.size());
222      g = new Get(ROW2);
223      g.setMaxVersions();
224      r = t.get(g);
225      assertEquals(3, r.size());
226      g = new Get(ROW3);
227      r = t.get(g);
228      assertEquals(0, r.size());
229    }
230  }
231
232  /**
233   * Test export hbase:meta table
234   */
235  @Test
236  public void testMetaExport() throws Throwable {
237    String[] args =
238      new String[] { TableName.META_TABLE_NAME.getNameAsString(), FQ_OUTPUT_DIR, "1", "0", "0" };
239    assertTrue(runExport(args));
240  }
241
242  /**
243   * Test import data from 0.94 exported file
244   */
245  @Test
246  public void testImport94Table() throws Throwable {
247    final String name = "exportedTableIn94Format";
248    URL url = TestCellBasedImportExport2.class.getResource(name);
249    File f = new File(url.toURI());
250    if (!f.exists()) {
251      LOG.warn("FAILED TO FIND " + f + "; skipping out on test");
252      return;
253    }
254    assertTrue(f.exists());
255    LOG.info("FILE=" + f);
256    Path importPath = new Path(f.toURI());
257    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
258    fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name));
259    String IMPORT_TABLE = name;
260    try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3);) {
261      String[] args = new String[] { "-Dhbase.import.version=0.94", IMPORT_TABLE, FQ_OUTPUT_DIR };
262      assertTrue(runImport(args));
263      /*
264       * exportedTableIn94Format contains 5 rows ROW COLUMN+CELL r1 column=f1:c1,
265       * timestamp=1383766761171, value=val1 r2 column=f1:c1, timestamp=1383766771642, value=val2 r3
266       * column=f1:c1, timestamp=1383766777615, value=val3 r4 column=f1:c1, timestamp=1383766785146,
267       * value=val4 r5 column=f1:c1, timestamp=1383766791506, value=val5
268       */
269      assertEquals(5, UTIL.countRows(t));
270    }
271  }
272
273  /**
274   * Test export scanner batching
275   */
276  @Test
277  public void testExportScannerBatching() throws Throwable {
278    TableDescriptor desc = TableDescriptorBuilder
279      .newBuilder(TableName.valueOf(name.getMethodName()))
280      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(1).build())
281      .build();
282    UTIL.getAdmin().createTable(desc);
283    try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
284
285      Put p = new Put(ROW1);
286      p.addColumn(FAMILYA, QUAL, now, QUAL);
287      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
288      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
289      p.addColumn(FAMILYA, QUAL, now + 3, QUAL);
290      p.addColumn(FAMILYA, QUAL, now + 4, QUAL);
291      t.put(p);
292
293      String[] args = new String[] { "-D" + ExportUtils.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added
294                                                                                                   // scanner
295                                                                                                   // batching
296                                                                                                   // arg.
297        name.getMethodName(), FQ_OUTPUT_DIR };
298      assertTrue(runExport(args));
299
300      FileSystem fs = FileSystem.get(UTIL.getConfiguration());
301      fs.delete(new Path(FQ_OUTPUT_DIR), true);
302    }
303  }
304
305  @Test
306  public void testWithDeletes() throws Throwable {
307    TableDescriptor desc =
308      TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
309        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
310          .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
311        .build();
312    UTIL.getAdmin().createTable(desc);
313    try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
314
315      Put p = new Put(ROW1);
316      p.addColumn(FAMILYA, QUAL, now, QUAL);
317      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
318      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
319      p.addColumn(FAMILYA, QUAL, now + 3, QUAL);
320      p.addColumn(FAMILYA, QUAL, now + 4, QUAL);
321      t.put(p);
322
323      Delete d = new Delete(ROW1, now + 3);
324      t.delete(d);
325      d = new Delete(ROW1);
326      d.addColumns(FAMILYA, QUAL, now + 2);
327      t.delete(d);
328    }
329
330    String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true", name.getMethodName(),
331      FQ_OUTPUT_DIR, "1000", // max number of key versions per key to export
332    };
333    assertTrue(runExport(args));
334
335    final String IMPORT_TABLE = name.getMethodName() + "import";
336    desc = TableDescriptorBuilder
337      .newBuilder(TableName.valueOf(IMPORT_TABLE)).setColumnFamily(ColumnFamilyDescriptorBuilder
338        .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build())
339      .build();
340    UTIL.getAdmin().createTable(desc);
341    try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
342      args = new String[] { IMPORT_TABLE, FQ_OUTPUT_DIR };
343      assertTrue(runImport(args));
344
345      Scan s = new Scan();
346      s.setMaxVersions();
347      s.setRaw(true);
348      ResultScanner scanner = t.getScanner(s);
349      Result r = scanner.next();
350      Cell[] res = r.rawCells();
351      assertTrue(PrivateCellUtil.isDeleteFamily(res[0]));
352      assertEquals(now + 4, res[1].getTimestamp());
353      assertEquals(now + 3, res[2].getTimestamp());
354      assertTrue(CellUtil.isDelete(res[3]));
355      assertEquals(now + 2, res[4].getTimestamp());
356      assertEquals(now + 1, res[5].getTimestamp());
357      assertEquals(now, res[6].getTimestamp());
358    }
359  }
360
361  @Test
362  public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Throwable {
363    final TableName exportTable = TableName.valueOf(name.getMethodName());
364    TableDescriptor desc =
365      TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
366        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
367          .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
368        .build();
369    UTIL.getAdmin().createTable(desc);
370
371    Table exportT = UTIL.getConnection().getTable(exportTable);
372
373    // Add first version of QUAL
374    Put p = new Put(ROW1);
375    p.addColumn(FAMILYA, QUAL, now, QUAL);
376    exportT.put(p);
377
378    // Add Delete family marker
379    Delete d = new Delete(ROW1, now + 3);
380    exportT.delete(d);
381
382    // Add second version of QUAL
383    p = new Put(ROW1);
384    p.addColumn(FAMILYA, QUAL, now + 5, "s".getBytes());
385    exportT.put(p);
386
387    // Add second Delete family marker
388    d = new Delete(ROW1, now + 7);
389    exportT.delete(d);
390
391    String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true",
392      exportTable.getNameAsString(), FQ_OUTPUT_DIR, "1000", // max number of key versions per key to
393                                                            // export
394    };
395    assertTrue(runExport(args));
396
397    final String importTable = name.getMethodName() + "import";
398    desc = TableDescriptorBuilder
399      .newBuilder(TableName.valueOf(importTable)).setColumnFamily(ColumnFamilyDescriptorBuilder
400        .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build())
401      .build();
402    UTIL.getAdmin().createTable(desc);
403
404    Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable));
405    args = new String[] { importTable, FQ_OUTPUT_DIR };
406    assertTrue(runImport(args));
407
408    Scan s = new Scan();
409    s.setMaxVersions();
410    s.setRaw(true);
411
412    ResultScanner importedTScanner = importT.getScanner(s);
413    Result importedTResult = importedTScanner.next();
414
415    ResultScanner exportedTScanner = exportT.getScanner(s);
416    Result exportedTResult = exportedTScanner.next();
417    try {
418      Result.compareResults(exportedTResult, importedTResult);
419    } catch (Throwable e) {
420      fail("Original and imported tables data comparision failed with error:" + e.getMessage());
421    } finally {
422      exportT.close();
423      importT.close();
424    }
425  }
426
427  /**
428   * Create a simple table, run an Export Job on it, Import with filtering on, verify counts,
429   * attempt with invalid values.
430   */
431  @Test
432  public void testWithFilter() throws Throwable {
433    // Create simple table to export
434    TableDescriptor desc = TableDescriptorBuilder
435      .newBuilder(TableName.valueOf(name.getMethodName()))
436      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
437      .build();
438    UTIL.getAdmin().createTable(desc);
439    Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
440
441    Put p1 = new Put(ROW1);
442    p1.addColumn(FAMILYA, QUAL, now, QUAL);
443    p1.addColumn(FAMILYA, QUAL, now + 1, QUAL);
444    p1.addColumn(FAMILYA, QUAL, now + 2, QUAL);
445    p1.addColumn(FAMILYA, QUAL, now + 3, QUAL);
446    p1.addColumn(FAMILYA, QUAL, now + 4, QUAL);
447
448    // Having another row would actually test the filter.
449    Put p2 = new Put(ROW2);
450    p2.addColumn(FAMILYA, QUAL, now, QUAL);
451
452    exportTable.put(Arrays.asList(p1, p2));
453
454    // Export the simple table
455    String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" };
456    assertTrue(runExport(args));
457
458    // Import to a new table
459    final String IMPORT_TABLE = name.getMethodName() + "import";
460    desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(IMPORT_TABLE))
461      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
462      .build();
463    UTIL.getAdmin().createTable(desc);
464
465    Table importTable = UTIL.getConnection().getTable(desc.getTableName());
466    args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
467      "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR,
468      "1000" };
469    assertTrue(runImport(args));
470
471    // get the count of the source table for that time range
472    PrefixFilter filter = new PrefixFilter(ROW1);
473    int count = getCount(exportTable, filter);
474
475    Assert.assertEquals("Unexpected row count between export and import tables", count,
476      getCount(importTable, null));
477
478    // and then test that a broken command doesn't bork everything - easier here because we don't
479    // need to re-run the export job
480
481    args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
482      "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", name.getMethodName(),
483      FQ_OUTPUT_DIR, "1000" };
484    assertFalse(runImport(args));
485
486    // cleanup
487    exportTable.close();
488    importTable.close();
489  }
490
491  /**
492   * Count the number of keyvalues in the specified table for the given timerange
493   */
494  private int getCount(Table table, Filter filter) throws IOException {
495    Scan scan = new Scan();
496    scan.setFilter(filter);
497    ResultScanner results = table.getScanner(scan);
498    int count = 0;
499    for (Result res : results) {
500      count += res.size();
501    }
502    results.close();
503    return count;
504  }
505
506  /**
507   * test main method. Import should print help and call System.exit
508   */
509  @Test
510  public void testImportMain() throws Throwable {
511    PrintStream oldPrintStream = System.err;
512    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
513    LauncherSecurityManager newSecurityManager = new LauncherSecurityManager();
514    System.setSecurityManager(newSecurityManager);
515    ByteArrayOutputStream data = new ByteArrayOutputStream();
516    String[] args = {};
517    System.setErr(new PrintStream(data));
518    try {
519      System.setErr(new PrintStream(data));
520      Import.main(args);
521      fail("should be SecurityException");
522    } catch (SecurityException e) {
523      assertEquals(-1, newSecurityManager.getExitCode());
524      assertTrue(data.toString().contains("Wrong number of arguments:"));
525      assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
526      assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>"));
527      assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
528      assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false"));
529    } finally {
530      System.setErr(oldPrintStream);
531      System.setSecurityManager(SECURITY_MANAGER);
532    }
533  }
534
535  @Test
536  public void testExportScan() throws Exception {
537    int version = 100;
538    long startTime = EnvironmentEdgeManager.currentTime();
539    long endTime = startTime + 1;
540    String prefix = "row";
541    String label_0 = "label_0";
542    String label_1 = "label_1";
543    String[] args = { "table", "outputDir", String.valueOf(version), String.valueOf(startTime),
544      String.valueOf(endTime), prefix };
545    Scan scan = ExportUtils.getScanFromCommandLine(UTIL.getConfiguration(), args);
546    assertEquals(version, scan.getMaxVersions());
547    assertEquals(startTime, scan.getTimeRange().getMin());
548    assertEquals(endTime, scan.getTimeRange().getMax());
549    assertEquals(true, (scan.getFilter() instanceof PrefixFilter));
550    assertEquals(0,
551      Bytes.compareTo(((PrefixFilter) scan.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix)));
552    String[] argsWithLabels =
553      { "-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + label_0 + "," + label_1, "table",
554        "outputDir", String.valueOf(version), String.valueOf(startTime), String.valueOf(endTime),
555        prefix };
556    Configuration conf = new Configuration(UTIL.getConfiguration());
557    // parse the "-D" options
558    String[] otherArgs = new GenericOptionsParser(conf, argsWithLabels).getRemainingArgs();
559    Scan scanWithLabels = ExportUtils.getScanFromCommandLine(conf, otherArgs);
560    assertEquals(version, scanWithLabels.getMaxVersions());
561    assertEquals(startTime, scanWithLabels.getTimeRange().getMin());
562    assertEquals(endTime, scanWithLabels.getTimeRange().getMax());
563    assertEquals(true, (scanWithLabels.getFilter() instanceof PrefixFilter));
564    assertEquals(0, Bytes.compareTo(((PrefixFilter) scanWithLabels.getFilter()).getPrefix(),
565      Bytes.toBytesBinary(prefix)));
566    assertEquals(2, scanWithLabels.getAuthorizations().getLabels().size());
567    assertEquals(label_0, scanWithLabels.getAuthorizations().getLabels().get(0));
568    assertEquals(label_1, scanWithLabels.getAuthorizations().getLabels().get(1));
569  }
570
571  /**
572   * test main method. Export should print help and call System.exit
573   */
574  @Test
575  public void testExportMain() throws Throwable {
576    PrintStream oldPrintStream = System.err;
577    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
578    LauncherSecurityManager newSecurityManager = new LauncherSecurityManager();
579    System.setSecurityManager(newSecurityManager);
580    ByteArrayOutputStream data = new ByteArrayOutputStream();
581    String[] args = {};
582    System.setErr(new PrintStream(data));
583    try {
584      System.setErr(new PrintStream(data));
585      runExportMain(args);
586      fail("should be SecurityException");
587    } catch (SecurityException e) {
588      assertEquals(-1, newSecurityManager.getExitCode());
589      String errMsg = data.toString();
590      assertTrue(errMsg.contains("Wrong number of arguments:"));
591      assertTrue(
592        errMsg.contains("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> "
593          + "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]"));
594      assertTrue(errMsg.contains("-D hbase.mapreduce.scan.column.family=<family1>,<family2>, ..."));
595      assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true"));
596      assertTrue(errMsg.contains("-D hbase.client.scanner.caching=100"));
597      assertTrue(errMsg.contains("-D hbase.export.scanner.batch=10"));
598      assertTrue(errMsg.contains("-D hbase.export.scanner.caching=100"));
599    } finally {
600      System.setErr(oldPrintStream);
601      System.setSecurityManager(SECURITY_MANAGER);
602    }
603  }
604
605  /**
606   * Test map method of Importer
607   */
608  @SuppressWarnings({ "unchecked", "rawtypes" })
609  @Test
610  public void testKeyValueImporter() throws Throwable {
611    CellImporter importer = new CellImporter();
612    Configuration configuration = new Configuration();
613    Context ctx = mock(Context.class);
614    when(ctx.getConfiguration()).thenReturn(configuration);
615
616    doAnswer(new Answer<Void>() {
617
618      @Override
619      public Void answer(InvocationOnMock invocation) throws Throwable {
620        ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
621        MapReduceExtendedCell key = (MapReduceExtendedCell) invocation.getArguments()[1];
622        assertEquals("Key", Bytes.toString(writer.get()));
623        assertEquals("row", Bytes.toString(CellUtil.cloneRow(key)));
624        return null;
625      }
626    }).when(ctx).write(any(ImmutableBytesWritable.class), any(MapReduceExtendedCell.class));
627
628    importer.setup(ctx);
629    Result value = mock(Result.class);
630    KeyValue[] keys = {
631      new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
632        Bytes.toBytes("value")),
633      new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
634        Bytes.toBytes("value1")) };
635    when(value.rawCells()).thenReturn(keys);
636    importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx);
637
638  }
639
640  /**
641   * Test addFilterAndArguments method of Import This method set couple parameters into
642   * Configuration
643   */
644  @Test
645  public void testAddFilterAndArguments() throws IOException {
646    Configuration configuration = new Configuration();
647
648    List<String> args = new ArrayList<>();
649    args.add("param1");
650    args.add("param2");
651
652    Import.addFilterAndArguments(configuration, FilterBase.class, args);
653    assertEquals("org.apache.hadoop.hbase.filter.FilterBase",
654      configuration.get(Import.FILTER_CLASS_CONF_KEY));
655    assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
656  }
657
658  @Test
659  public void testDurability() throws Throwable {
660    // Create an export table.
661    String exportTableName = name.getMethodName() + "export";
662    try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) {
663
664      // Insert some data
665      Put put = new Put(ROW1);
666      put.addColumn(FAMILYA, QUAL, now, QUAL);
667      put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
668      put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
669      exportTable.put(put);
670
671      put = new Put(ROW2);
672      put.addColumn(FAMILYA, QUAL, now, QUAL);
673      put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
674      put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
675      exportTable.put(put);
676
677      // Run the export
678      String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000" };
679      assertTrue(runExport(args));
680
681      // Create the table for import
682      String importTableName = name.getMethodName() + "import1";
683      Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
684
685      // Register the wal listener for the import table
686      RegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
687        .getRegions(importTable.getName()).get(0).getRegionInfo();
688      TableWALActionListener walListener = new TableWALActionListener(region);
689      WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
690      wal.registerWALActionsListener(walListener);
691
692      // Run the import with SKIP_WAL
693      args = new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
694        importTableName, FQ_OUTPUT_DIR };
695      assertTrue(runImport(args));
696      // Assert that the wal is not visisted
697      assertTrue(!walListener.isWALVisited());
698      // Ensure that the count is 2 (only one version of key value is obtained)
699      assertTrue(getCount(importTable, null) == 2);
700
701      // Run the import with the default durability option
702      importTableName = name.getMethodName() + "import2";
703      importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
704      region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
705        .getRegions(importTable.getName()).get(0).getRegionInfo();
706      wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
707      walListener = new TableWALActionListener(region);
708      wal.registerWALActionsListener(walListener);
709      args = new String[] { importTableName, FQ_OUTPUT_DIR };
710      assertTrue(runImport(args));
711      // Assert that the wal is visisted
712      assertTrue(walListener.isWALVisited());
713      // Ensure that the count is 2 (only one version of key value is obtained)
714      assertTrue(getCount(importTable, null) == 2);
715    }
716  }
717
718  /**
719   * This listens to the {@link #visitLogEntryBeforeWrite(RegionInfo, WALKey, WALEdit)} to identify
720   * that an entry is written to the Write Ahead Log for the given table.
721   */
722  private static class TableWALActionListener implements WALActionsListener {
723
724    private RegionInfo regionInfo;
725    private boolean isVisited = false;
726
727    public TableWALActionListener(RegionInfo region) {
728      this.regionInfo = region;
729    }
730
731    @Override
732    public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
733      if (
734        logKey.getTableName().getNameAsString()
735          .equalsIgnoreCase(this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit())
736      ) {
737        isVisited = true;
738      }
739    }
740
741    public boolean isWALVisited() {
742      return isVisited;
743    }
744  }
745}