001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Mockito.*;
024import static org.mockito.Mockito.doReturn;
025import static org.mockito.Mockito.doThrow;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.spy;
028
029import java.io.IOException;
030import java.util.Arrays;
031import java.util.Map;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CompareOperator;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.NotServingRegionException;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.ResultScanner;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.filter.Filter;
047import org.apache.hadoop.hbase.filter.RegexStringComparator;
048import org.apache.hadoop.hbase.filter.RowFilter;
049import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.io.NullWritable;
053import org.apache.hadoop.mapred.JobConf;
054import org.apache.hadoop.mapred.JobConfigurable;
055import org.apache.hadoop.mapred.MiniMRCluster;
056import org.apache.hadoop.mapreduce.InputFormat;
057import org.apache.hadoop.mapreduce.Job;
058import org.apache.hadoop.mapreduce.JobContext;
059import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
060import org.junit.AfterClass;
061import org.junit.Before;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.mockito.invocation.InvocationOnMock;
067import org.mockito.stubbing.Answer;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071/**
072 * This tests the TableInputFormat and its recovery semantics
073 */
074@Category(LargeTests.class)
075public class TestTableInputFormat {
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079    HBaseClassTestRule.forClass(TestTableInputFormat.class);
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormat.class);
082
083  private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
084  private static MiniMRCluster mrCluster;
085  static final byte[] FAMILY = Bytes.toBytes("family");
086
087  private static final byte[][] columns = new byte[][] { FAMILY };
088
089  @BeforeClass
090  public static void beforeClass() throws Exception {
091    UTIL.startMiniCluster();
092  }
093
094  @AfterClass
095  public static void afterClass() throws Exception {
096    UTIL.shutdownMiniCluster();
097  }
098
099  @Before
100  public void before() throws IOException {
101    LOG.info("before");
102    UTIL.ensureSomeRegionServersAvailable(1);
103    LOG.info("before done");
104  }
105
106  /**
107   * Setup a table with two rows and values.
108   * @return A Table instance for the created table.
109   */
110  public static Table createTable(byte[] tableName) throws IOException {
111    return createTable(tableName, new byte[][] { FAMILY });
112  }
113
114  /**
115   * Setup a table with two rows and values per column family.
116   * @return A Table instance for the created table.
117   */
118  public static Table createTable(byte[] tableName, byte[][] families) throws IOException {
119    Table table = UTIL.createTable(TableName.valueOf(tableName), families);
120    Put p = new Put("aaa".getBytes());
121    for (byte[] family : families) {
122      p.addColumn(family, null, "value aaa".getBytes());
123    }
124    table.put(p);
125    p = new Put("bbb".getBytes());
126    for (byte[] family : families) {
127      p.addColumn(family, null, "value bbb".getBytes());
128    }
129    table.put(p);
130    return table;
131  }
132
133  /**
134   * Verify that the result and key have expected values.
135   * @param r             single row result
136   * @param key           the row key
137   * @param expectedKey   the expected key
138   * @param expectedValue the expected value
139   * @return true if the result contains the expected key and value, false otherwise.
140   */
141  static boolean checkResult(Result r, ImmutableBytesWritable key, byte[] expectedKey,
142    byte[] expectedValue) {
143    assertEquals(0, key.compareTo(expectedKey));
144    Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
145    byte[] value = vals.values().iterator().next();
146    assertTrue(Arrays.equals(value, expectedValue));
147    return true; // if succeed
148  }
149
150  /**
151   * Create table data and run tests on specified htable using the o.a.h.hbase.mapreduce API.
152   */
153  static void runTestMapreduce(Table table) throws IOException, InterruptedException {
154    org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr =
155      new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl();
156    Scan s = new Scan();
157    s.setStartRow("aaa".getBytes());
158    s.setStopRow("zzz".getBytes());
159    s.addFamily(FAMILY);
160    trr.setScan(s);
161    trr.setHTable(table);
162
163    trr.initialize(null, null);
164    Result r = new Result();
165    ImmutableBytesWritable key = new ImmutableBytesWritable();
166
167    boolean more = trr.nextKeyValue();
168    assertTrue(more);
169    key = trr.getCurrentKey();
170    r = trr.getCurrentValue();
171    checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
172
173    more = trr.nextKeyValue();
174    assertTrue(more);
175    key = trr.getCurrentKey();
176    r = trr.getCurrentValue();
177    checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
178
179    // no more data
180    more = trr.nextKeyValue();
181    assertFalse(more);
182  }
183
184  /**
185   * Create a table that IOE's on first scanner next call
186   */
187  static Table createIOEScannerTable(byte[] name, final int failCnt) throws IOException {
188    // build up a mock scanner stuff to fail the first time
189    Answer<ResultScanner> a = new Answer<ResultScanner>() {
190      int cnt = 0;
191
192      @Override
193      public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
194        // first invocation return the busted mock scanner
195        if (cnt++ < failCnt) {
196          // create mock ResultScanner that always fails.
197          Scan scan = mock(Scan.class);
198          doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
199          ResultScanner scanner = mock(ResultScanner.class);
200          // simulate TimeoutException / IOException
201          doThrow(new IOException("Injected exception")).when(scanner).next();
202          return scanner;
203        }
204
205        // otherwise return the real scanner.
206        return (ResultScanner) invocation.callRealMethod();
207      }
208    };
209
210    Table htable = spy(createTable(name));
211    doAnswer(a).when(htable).getScanner(any(Scan.class));
212    return htable;
213  }
214
215  /**
216   * Create a table that throws a NotServingRegionException on first scanner next call
217   */
218  static Table createDNRIOEScannerTable(byte[] name, final int failCnt) throws IOException {
219    // build up a mock scanner stuff to fail the first time
220    Answer<ResultScanner> a = new Answer<ResultScanner>() {
221      int cnt = 0;
222
223      @Override
224      public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
225        // first invocation return the busted mock scanner
226        if (cnt++ < failCnt) {
227          // create mock ResultScanner that always fails.
228          Scan scan = mock(Scan.class);
229          doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
230          ResultScanner scanner = mock(ResultScanner.class);
231
232          invocation.callRealMethod(); // simulate NotServingRegionException
233          doThrow(new NotServingRegionException("Injected simulated TimeoutException"))
234            .when(scanner).next();
235          return scanner;
236        }
237
238        // otherwise return the real scanner.
239        return (ResultScanner) invocation.callRealMethod();
240      }
241    };
242
243    Table htable = spy(createTable(name));
244    doAnswer(a).when(htable).getScanner(any(Scan.class));
245    return htable;
246  }
247
248  /**
249   * Run test assuming no errors using newer mapreduce api
250   */
251  @Test
252  public void testTableRecordReaderMapreduce() throws IOException, InterruptedException {
253    Table table = createTable("table1-mr".getBytes());
254    runTestMapreduce(table);
255  }
256
257  /**
258   * Run test assuming Scanner IOException failure using newer mapreduce api
259   */
260  @Test
261  public void testTableRecordReaderScannerFailMapreduce() throws IOException, InterruptedException {
262    Table htable = createIOEScannerTable("table2-mr".getBytes(), 1);
263    runTestMapreduce(htable);
264  }
265
266  /**
267   * Run test assuming Scanner IOException failure using newer mapreduce api
268   */
269  @Test(expected = IOException.class)
270  public void testTableRecordReaderScannerFailMapreduceTwice()
271    throws IOException, InterruptedException {
272    Table htable = createIOEScannerTable("table3-mr".getBytes(), 2);
273    runTestMapreduce(htable);
274  }
275
276  /**
277   * Run test assuming NotServingRegionException using newer mapreduce api
278   */
279  @Test
280  public void testTableRecordReaderScannerTimeoutMapreduce()
281    throws IOException, InterruptedException {
282    Table htable = createDNRIOEScannerTable("table4-mr".getBytes(), 1);
283    runTestMapreduce(htable);
284  }
285
286  /**
287   * Run test assuming NotServingRegionException using newer mapreduce api
288   */
289  @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class)
290  public void testTableRecordReaderScannerTimeoutMapreduceTwice()
291    throws IOException, InterruptedException {
292    Table htable = createDNRIOEScannerTable("table5-mr".getBytes(), 2);
293    runTestMapreduce(htable);
294  }
295
296  /**
297   * Verify the example we present in javadocs on TableInputFormatBase
298   */
299  @Test
300  public void testExtensionOfTableInputFormatBase()
301    throws IOException, InterruptedException, ClassNotFoundException {
302    LOG.info("testing use of an InputFormat taht extends InputFormatBase");
303    final Table htable = createTable(Bytes.toBytes("exampleTable"),
304      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
305    testInputFormat(ExampleTIF.class);
306  }
307
308  @Test
309  public void testJobConfigurableExtensionOfTableInputFormatBase()
310    throws IOException, InterruptedException, ClassNotFoundException {
311    LOG.info(
312      "testing use of an InputFormat taht extends InputFormatBase, " + "using JobConfigurable.");
313    final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
314      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
315    testInputFormat(ExampleJobConfigurableTIF.class);
316  }
317
318  @Test
319  public void testDeprecatedExtensionOfTableInputFormatBase()
320    throws IOException, InterruptedException, ClassNotFoundException {
321    LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
322      + "using the approach documented in 0.98.");
323    final Table htable = createTable(Bytes.toBytes("exampleDeprecatedTable"),
324      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
325    testInputFormat(ExampleDeprecatedTIF.class);
326  }
327
328  void testInputFormat(Class<? extends InputFormat> clazz)
329    throws IOException, InterruptedException, ClassNotFoundException {
330    final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration());
331    job.setInputFormatClass(clazz);
332    job.setOutputFormatClass(NullOutputFormat.class);
333    job.setMapperClass(ExampleVerifier.class);
334    job.setNumReduceTasks(0);
335
336    LOG.debug("submitting job.");
337    assertTrue("job failed!", job.waitForCompletion(true));
338    assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters()
339      .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue());
340    assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters()
341      .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue());
342    assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters()
343      .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue());
344    assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters()
345      .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue());
346    assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters()
347      .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue());
348    assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters()
349      .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue());
350  }
351
352  public static class ExampleVerifier extends TableMapper<NullWritable, NullWritable> {
353
354    @Override
355    public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException {
356      for (Cell cell : value.listCells()) {
357        context
358          .getCounter(TestTableInputFormat.class.getName() + ":row",
359            Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
360          .increment(1l);
361        context
362          .getCounter(TestTableInputFormat.class.getName() + ":family",
363            Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
364          .increment(1l);
365        context
366          .getCounter(TestTableInputFormat.class.getName() + ":value",
367            Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
368          .increment(1l);
369      }
370    }
371
372  }
373
374  public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable {
375
376    @Override
377    public void configure(JobConf job) {
378      try {
379        Connection connection = ConnectionFactory.createConnection(job);
380        Table exampleTable = connection.getTable(TableName.valueOf(("exampleDeprecatedTable")));
381        // mandatory
382        initializeTable(connection, exampleTable.getName());
383        byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") };
384        // optional
385        Scan scan = new Scan();
386        for (byte[] family : inputColumns) {
387          scan.addFamily(family);
388        }
389        Filter exampleFilter =
390          new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
391        scan.setFilter(exampleFilter);
392        setScan(scan);
393      } catch (IOException exception) {
394        throw new RuntimeException("Failed to configure for job.", exception);
395      }
396    }
397
398  }
399
400  public static class ExampleJobConfigurableTIF extends TableInputFormatBase
401    implements JobConfigurable {
402
403    @Override
404    public void configure(JobConf job) {
405      try {
406        Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
407        TableName tableName = TableName.valueOf("exampleJobConfigurableTable");
408        // mandatory
409        initializeTable(connection, tableName);
410        byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") };
411        // optional
412        Scan scan = new Scan();
413        for (byte[] family : inputColumns) {
414          scan.addFamily(family);
415        }
416        Filter exampleFilter =
417          new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
418        scan.setFilter(exampleFilter);
419        setScan(scan);
420      } catch (IOException exception) {
421        throw new RuntimeException("Failed to initialize.", exception);
422      }
423    }
424  }
425
426  public static class ExampleTIF extends TableInputFormatBase {
427
428    @Override
429    protected void initialize(JobContext job) throws IOException {
430      Connection connection =
431        ConnectionFactory.createConnection(HBaseConfiguration.create(job.getConfiguration()));
432      TableName tableName = TableName.valueOf("exampleTable");
433      // mandatory
434      initializeTable(connection, tableName);
435      byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") };
436      // optional
437      Scan scan = new Scan();
438      for (byte[] family : inputColumns) {
439        scan.addFamily(family);
440      }
441      Filter exampleFilter =
442        new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*"));
443      scan.setFilter(exampleFilter);
444      setScan(scan);
445    }
446
447  }
448}