001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Mockito.*; 024import static org.mockito.Mockito.doReturn; 025import static org.mockito.Mockito.doThrow; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.spy; 028 029import java.io.IOException; 030import java.util.Arrays; 031import java.util.Map; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CompareOperator; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.NotServingRegionException; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.ResultScanner; 044import org.apache.hadoop.hbase.client.Scan; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.filter.Filter; 047import org.apache.hadoop.hbase.filter.RegexStringComparator; 048import org.apache.hadoop.hbase.filter.RowFilter; 049import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 050import org.apache.hadoop.hbase.testclassification.LargeTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.io.NullWritable; 053import org.apache.hadoop.mapred.JobConf; 054import org.apache.hadoop.mapred.JobConfigurable; 055import org.apache.hadoop.mapred.MiniMRCluster; 056import org.apache.hadoop.mapreduce.InputFormat; 057import org.apache.hadoop.mapreduce.Job; 058import org.apache.hadoop.mapreduce.JobContext; 059import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 060import org.junit.AfterClass; 061import org.junit.Before; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.mockito.invocation.InvocationOnMock; 067import org.mockito.stubbing.Answer; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071/** 072 * This tests the TableInputFormat and its recovery semantics 073 */ 074@Category(LargeTests.class) 075public class TestTableInputFormat { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestTableInputFormat.class); 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormat.class); 082 083 private final static HBaseTestingUtility UTIL = new HBaseTestingUtility(); 084 private static MiniMRCluster mrCluster; 085 static final byte[] FAMILY = Bytes.toBytes("family"); 086 087 private static final byte[][] columns = new byte[][] { FAMILY }; 088 089 @BeforeClass 090 public static void beforeClass() throws Exception { 091 UTIL.startMiniCluster(); 092 } 093 094 @AfterClass 095 public static void afterClass() throws Exception { 096 UTIL.shutdownMiniCluster(); 097 } 098 099 @Before 100 public void before() throws IOException { 101 LOG.info("before"); 102 UTIL.ensureSomeRegionServersAvailable(1); 103 LOG.info("before done"); 104 } 105 106 /** 107 * Setup a table with two rows and values. 108 * @return A Table instance for the created table. 109 */ 110 public static Table createTable(byte[] tableName) throws IOException { 111 return createTable(tableName, new byte[][] { FAMILY }); 112 } 113 114 /** 115 * Setup a table with two rows and values per column family. 116 * @return A Table instance for the created table. 117 */ 118 public static Table createTable(byte[] tableName, byte[][] families) throws IOException { 119 Table table = UTIL.createTable(TableName.valueOf(tableName), families); 120 Put p = new Put("aaa".getBytes()); 121 for (byte[] family : families) { 122 p.addColumn(family, null, "value aaa".getBytes()); 123 } 124 table.put(p); 125 p = new Put("bbb".getBytes()); 126 for (byte[] family : families) { 127 p.addColumn(family, null, "value bbb".getBytes()); 128 } 129 table.put(p); 130 return table; 131 } 132 133 /** 134 * Verify that the result and key have expected values. 135 * @param r single row result 136 * @param key the row key 137 * @param expectedKey the expected key 138 * @param expectedValue the expected value 139 * @return true if the result contains the expected key and value, false otherwise. 140 */ 141 static boolean checkResult(Result r, ImmutableBytesWritable key, byte[] expectedKey, 142 byte[] expectedValue) { 143 assertEquals(0, key.compareTo(expectedKey)); 144 Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY); 145 byte[] value = vals.values().iterator().next(); 146 assertTrue(Arrays.equals(value, expectedValue)); 147 return true; // if succeed 148 } 149 150 /** 151 * Create table data and run tests on specified htable using the o.a.h.hbase.mapreduce API. 152 */ 153 static void runTestMapreduce(Table table) throws IOException, InterruptedException { 154 org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr = 155 new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl(); 156 Scan s = new Scan(); 157 s.setStartRow("aaa".getBytes()); 158 s.setStopRow("zzz".getBytes()); 159 s.addFamily(FAMILY); 160 trr.setScan(s); 161 trr.setHTable(table); 162 163 trr.initialize(null, null); 164 Result r = new Result(); 165 ImmutableBytesWritable key = new ImmutableBytesWritable(); 166 167 boolean more = trr.nextKeyValue(); 168 assertTrue(more); 169 key = trr.getCurrentKey(); 170 r = trr.getCurrentValue(); 171 checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes()); 172 173 more = trr.nextKeyValue(); 174 assertTrue(more); 175 key = trr.getCurrentKey(); 176 r = trr.getCurrentValue(); 177 checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes()); 178 179 // no more data 180 more = trr.nextKeyValue(); 181 assertFalse(more); 182 } 183 184 /** 185 * Create a table that IOE's on first scanner next call 186 */ 187 static Table createIOEScannerTable(byte[] name, final int failCnt) throws IOException { 188 // build up a mock scanner stuff to fail the first time 189 Answer<ResultScanner> a = new Answer<ResultScanner>() { 190 int cnt = 0; 191 192 @Override 193 public ResultScanner answer(InvocationOnMock invocation) throws Throwable { 194 // first invocation return the busted mock scanner 195 if (cnt++ < failCnt) { 196 // create mock ResultScanner that always fails. 197 Scan scan = mock(Scan.class); 198 doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe 199 ResultScanner scanner = mock(ResultScanner.class); 200 // simulate TimeoutException / IOException 201 doThrow(new IOException("Injected exception")).when(scanner).next(); 202 return scanner; 203 } 204 205 // otherwise return the real scanner. 206 return (ResultScanner) invocation.callRealMethod(); 207 } 208 }; 209 210 Table htable = spy(createTable(name)); 211 doAnswer(a).when(htable).getScanner(any(Scan.class)); 212 return htable; 213 } 214 215 /** 216 * Create a table that throws a NotServingRegionException on first scanner next call 217 */ 218 static Table createDNRIOEScannerTable(byte[] name, final int failCnt) throws IOException { 219 // build up a mock scanner stuff to fail the first time 220 Answer<ResultScanner> a = new Answer<ResultScanner>() { 221 int cnt = 0; 222 223 @Override 224 public ResultScanner answer(InvocationOnMock invocation) throws Throwable { 225 // first invocation return the busted mock scanner 226 if (cnt++ < failCnt) { 227 // create mock ResultScanner that always fails. 228 Scan scan = mock(Scan.class); 229 doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe 230 ResultScanner scanner = mock(ResultScanner.class); 231 232 invocation.callRealMethod(); // simulate NotServingRegionException 233 doThrow(new NotServingRegionException("Injected simulated TimeoutException")) 234 .when(scanner).next(); 235 return scanner; 236 } 237 238 // otherwise return the real scanner. 239 return (ResultScanner) invocation.callRealMethod(); 240 } 241 }; 242 243 Table htable = spy(createTable(name)); 244 doAnswer(a).when(htable).getScanner(any(Scan.class)); 245 return htable; 246 } 247 248 /** 249 * Run test assuming no errors using newer mapreduce api 250 */ 251 @Test 252 public void testTableRecordReaderMapreduce() throws IOException, InterruptedException { 253 Table table = createTable("table1-mr".getBytes()); 254 runTestMapreduce(table); 255 } 256 257 /** 258 * Run test assuming Scanner IOException failure using newer mapreduce api 259 */ 260 @Test 261 public void testTableRecordReaderScannerFailMapreduce() throws IOException, InterruptedException { 262 Table htable = createIOEScannerTable("table2-mr".getBytes(), 1); 263 runTestMapreduce(htable); 264 } 265 266 /** 267 * Run test assuming Scanner IOException failure using newer mapreduce api 268 */ 269 @Test(expected = IOException.class) 270 public void testTableRecordReaderScannerFailMapreduceTwice() 271 throws IOException, InterruptedException { 272 Table htable = createIOEScannerTable("table3-mr".getBytes(), 2); 273 runTestMapreduce(htable); 274 } 275 276 /** 277 * Run test assuming NotServingRegionException using newer mapreduce api 278 */ 279 @Test 280 public void testTableRecordReaderScannerTimeoutMapreduce() 281 throws IOException, InterruptedException { 282 Table htable = createDNRIOEScannerTable("table4-mr".getBytes(), 1); 283 runTestMapreduce(htable); 284 } 285 286 /** 287 * Run test assuming NotServingRegionException using newer mapreduce api 288 */ 289 @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class) 290 public void testTableRecordReaderScannerTimeoutMapreduceTwice() 291 throws IOException, InterruptedException { 292 Table htable = createDNRIOEScannerTable("table5-mr".getBytes(), 2); 293 runTestMapreduce(htable); 294 } 295 296 /** 297 * Verify the example we present in javadocs on TableInputFormatBase 298 */ 299 @Test 300 public void testExtensionOfTableInputFormatBase() 301 throws IOException, InterruptedException, ClassNotFoundException { 302 LOG.info("testing use of an InputFormat taht extends InputFormatBase"); 303 final Table htable = createTable(Bytes.toBytes("exampleTable"), 304 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 305 testInputFormat(ExampleTIF.class); 306 } 307 308 @Test 309 public void testJobConfigurableExtensionOfTableInputFormatBase() 310 throws IOException, InterruptedException, ClassNotFoundException { 311 LOG.info( 312 "testing use of an InputFormat taht extends InputFormatBase, " + "using JobConfigurable."); 313 final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"), 314 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 315 testInputFormat(ExampleJobConfigurableTIF.class); 316 } 317 318 @Test 319 public void testDeprecatedExtensionOfTableInputFormatBase() 320 throws IOException, InterruptedException, ClassNotFoundException { 321 LOG.info("testing use of an InputFormat taht extends InputFormatBase, " 322 + "using the approach documented in 0.98."); 323 final Table htable = createTable(Bytes.toBytes("exampleDeprecatedTable"), 324 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); 325 testInputFormat(ExampleDeprecatedTIF.class); 326 } 327 328 void testInputFormat(Class<? extends InputFormat> clazz) 329 throws IOException, InterruptedException, ClassNotFoundException { 330 final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration()); 331 job.setInputFormatClass(clazz); 332 job.setOutputFormatClass(NullOutputFormat.class); 333 job.setMapperClass(ExampleVerifier.class); 334 job.setNumReduceTasks(0); 335 336 LOG.debug("submitting job."); 337 assertTrue("job failed!", job.waitForCompletion(true)); 338 assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters() 339 .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue()); 340 assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters() 341 .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue()); 342 assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters() 343 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue()); 344 assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters() 345 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue()); 346 assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters() 347 .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue()); 348 assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters() 349 .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue()); 350 } 351 352 public static class ExampleVerifier extends TableMapper<NullWritable, NullWritable> { 353 354 @Override 355 public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException { 356 for (Cell cell : value.listCells()) { 357 context 358 .getCounter(TestTableInputFormat.class.getName() + ":row", 359 Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())) 360 .increment(1l); 361 context 362 .getCounter(TestTableInputFormat.class.getName() + ":family", 363 Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) 364 .increment(1l); 365 context 366 .getCounter(TestTableInputFormat.class.getName() + ":value", 367 Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())) 368 .increment(1l); 369 } 370 } 371 372 } 373 374 public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable { 375 376 @Override 377 public void configure(JobConf job) { 378 try { 379 Connection connection = ConnectionFactory.createConnection(job); 380 Table exampleTable = connection.getTable(TableName.valueOf(("exampleDeprecatedTable"))); 381 // mandatory 382 initializeTable(connection, exampleTable.getName()); 383 byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; 384 // optional 385 Scan scan = new Scan(); 386 for (byte[] family : inputColumns) { 387 scan.addFamily(family); 388 } 389 Filter exampleFilter = 390 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 391 scan.setFilter(exampleFilter); 392 setScan(scan); 393 } catch (IOException exception) { 394 throw new RuntimeException("Failed to configure for job.", exception); 395 } 396 } 397 398 } 399 400 public static class ExampleJobConfigurableTIF extends TableInputFormatBase 401 implements JobConfigurable { 402 403 @Override 404 public void configure(JobConf job) { 405 try { 406 Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); 407 TableName tableName = TableName.valueOf("exampleJobConfigurableTable"); 408 // mandatory 409 initializeTable(connection, tableName); 410 byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; 411 // optional 412 Scan scan = new Scan(); 413 for (byte[] family : inputColumns) { 414 scan.addFamily(family); 415 } 416 Filter exampleFilter = 417 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 418 scan.setFilter(exampleFilter); 419 setScan(scan); 420 } catch (IOException exception) { 421 throw new RuntimeException("Failed to initialize.", exception); 422 } 423 } 424 } 425 426 public static class ExampleTIF extends TableInputFormatBase { 427 428 @Override 429 protected void initialize(JobContext job) throws IOException { 430 Connection connection = 431 ConnectionFactory.createConnection(HBaseConfiguration.create(job.getConfiguration())); 432 TableName tableName = TableName.valueOf("exampleTable"); 433 // mandatory 434 initializeTable(connection, tableName); 435 byte[][] inputColumns = new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; 436 // optional 437 Scan scan = new Scan(); 438 for (byte[] family : inputColumns) { 439 scan.addFamily(family); 440 } 441 Filter exampleFilter = 442 new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("aa.*")); 443 scan.setFilter(exampleFilter); 444 setScan(scan); 445 } 446 447 } 448}