001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertTrue; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024import java.util.Iterator; 025import java.util.Map; 026import java.util.NavigableMap; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Put; 034import org.apache.hadoop.hbase.client.Result; 035import org.apache.hadoop.hbase.client.ResultScanner; 036import org.apache.hadoop.hbase.client.Scan; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.junit.AfterClass; 041import org.junit.BeforeClass; 042import org.junit.Test; 043import org.slf4j.Logger; 044 045/** 046 * A base class for a test Map/Reduce job over HBase tables. The map/reduce process we're testing on 047 * our tables is simple - take every row in the table, reverse the value of a particular cell, and 048 * write it back to the table. Implements common components between mapred and mapreduce 049 * implementations. 050 */ 051public abstract class TestTableMapReduceBase { 052 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 053 protected static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest"); 054 protected static final TableName TABLE_FOR_NEGATIVE_TESTS = TableName.valueOf("testfailuretable"); 055 protected static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); 056 protected static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); 057 058 protected static final byte[][] columns = new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY }; 059 060 /** 061 * Retrieve my logger instance. 062 */ 063 protected abstract Logger getLog(); 064 065 /** 066 * Handles API-specifics for setting up and executing the job. 067 */ 068 protected abstract void runTestOnTable(Table table) throws IOException; 069 070 @BeforeClass 071 public static void beforeClass() throws Exception { 072 UTIL.startMiniCluster(); 073 Table table = UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, 074 new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY }); 075 UTIL.loadTable(table, INPUT_FAMILY, false); 076 UTIL.createTable(TABLE_FOR_NEGATIVE_TESTS, new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY }); 077 } 078 079 @AfterClass 080 public static void afterClass() throws Exception { 081 UTIL.deleteTable(TABLE_FOR_NEGATIVE_TESTS); 082 UTIL.shutdownMiniCluster(); 083 } 084 085 /** 086 * Test a map/reduce against a multi-region table 087 */ 088 @Test 089 public void testMultiRegionTable() throws IOException { 090 runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME)); 091 } 092 093 @Test 094 public void testCombiner() throws IOException { 095 Configuration conf = new Configuration(UTIL.getConfiguration()); 096 // force use of combiner for testing purposes 097 conf.setInt("mapreduce.map.combine.minspills", 1); 098 runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME)); 099 } 100 101 /** 102 * Implements mapper logic for use across APIs. 103 */ 104 protected static Put map(ImmutableBytesWritable key, Result value) throws IOException { 105 if (value.size() != 1) { 106 throw new IOException("There should only be one input column"); 107 } 108 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf = value.getMap(); 109 if (!cf.containsKey(INPUT_FAMILY)) { 110 throw new IOException( 111 "Wrong input columns. Missing: '" + Bytes.toString(INPUT_FAMILY) + "'."); 112 } 113 114 // Get the original value and reverse it 115 116 String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); 117 StringBuilder newValue = new StringBuilder(originalValue); 118 newValue.reverse(); 119 120 // Now set the value to be collected 121 122 Put outval = new Put(key.get()); 123 outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); 124 return outval; 125 } 126 127 protected void verify(TableName tableName) throws IOException { 128 Table table = UTIL.getConnection().getTable(tableName); 129 boolean verified = false; 130 long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000); 131 int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 132 for (int i = 0; i < numRetries; i++) { 133 try { 134 getLog().info("Verification attempt #" + i); 135 verifyAttempt(table); 136 verified = true; 137 break; 138 } catch (NullPointerException e) { 139 // If here, a cell was empty. Presume its because updates came in 140 // after the scanner had been opened. Wait a while and retry. 141 getLog().debug("Verification attempt failed: " + e.getMessage()); 142 } 143 try { 144 Thread.sleep(pause); 145 } catch (InterruptedException e) { 146 // continue 147 } 148 } 149 assertTrue(verified); 150 } 151 152 /** 153 * Looks at every value of the mapreduce output and verifies that indeed the values have been 154 * reversed. 155 * @param table Table to scan. 156 * @throws NullPointerException if we failed to find a cell value 157 */ 158 private void verifyAttempt(final Table table) throws IOException, NullPointerException { 159 Scan scan = new Scan(); 160 TableInputFormat.addColumns(scan, columns); 161 ResultScanner scanner = table.getScanner(scan); 162 try { 163 Iterator<Result> itr = scanner.iterator(); 164 assertTrue(itr.hasNext()); 165 while (itr.hasNext()) { 166 Result r = itr.next(); 167 if (getLog().isDebugEnabled()) { 168 if (r.size() > 2) { 169 throw new IOException("Too many results, expected 2 got " + r.size()); 170 } 171 } 172 byte[] firstValue = null; 173 byte[] secondValue = null; 174 int count = 0; 175 for (Cell kv : r.listCells()) { 176 if (count == 0) { 177 firstValue = CellUtil.cloneValue(kv); 178 } 179 if (count == 1) { 180 secondValue = CellUtil.cloneValue(kv); 181 } 182 count++; 183 if (count == 2) { 184 break; 185 } 186 } 187 188 if (firstValue == null) { 189 throw new NullPointerException(Bytes.toString(r.getRow()) + ": first value is null"); 190 } 191 String first = Bytes.toString(firstValue); 192 193 if (secondValue == null) { 194 throw new NullPointerException(Bytes.toString(r.getRow()) + ": second value is null"); 195 } 196 byte[] secondReversed = new byte[secondValue.length]; 197 for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { 198 secondReversed[i] = secondValue[j]; 199 } 200 String second = Bytes.toString(secondReversed); 201 202 if (first.compareTo(second) != 0) { 203 if (getLog().isDebugEnabled()) { 204 getLog().debug( 205 "second key is not the reverse of first. row=" + Bytes.toStringBinary(r.getRow()) 206 + ", first value=" + first + ", second value=" + second); 207 } 208 fail(); 209 } 210 } 211 } finally { 212 scanner.close(); 213 } 214 } 215}