001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.File; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Locale; 028import java.util.Map; 029import java.util.NavigableMap; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileUtil; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Result; 036import org.apache.hadoop.hbase.client.Scan; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 039import org.apache.hadoop.hbase.logging.Log4jUtils; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.io.NullWritable; 042import org.apache.hadoop.mapreduce.Job; 043import org.apache.hadoop.mapreduce.Reducer; 044import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 045import org.junit.After; 046import org.junit.AfterClass; 047import org.junit.BeforeClass; 048import org.junit.Test; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 053 054/** 055 * Base set of tests and setup for input formats touching multiple tables. 056 */ 057public abstract class MultiTableInputFormatTestBase { 058 static final Logger LOG = LoggerFactory.getLogger(TestMultiTableInputFormat.class); 059 public static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 060 static final String TABLE_NAME = "scantest"; 061 static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); 062 static final String KEY_STARTROW = "startRow"; 063 static final String KEY_LASTROW = "stpRow"; 064 065 static List<String> TABLES = Lists.newArrayList(); 066 067 static { 068 for (int i = 0; i < 3; i++) { 069 TABLES.add(TABLE_NAME + String.valueOf(i)); 070 } 071 } 072 073 @BeforeClass 074 public static void setUpBeforeClass() throws Exception { 075 // switch TIF to log at DEBUG level 076 Log4jUtils.enableDebug(MultiTableInputFormatBase.class); 077 // start mini hbase cluster 078 TEST_UTIL.startMiniCluster(3); 079 // create and fill table 080 for (String tableName : TABLES) { 081 try (Table table = 082 TEST_UTIL.createMultiRegionTable(TableName.valueOf(tableName), INPUT_FAMILY, 4)) { 083 TEST_UTIL.loadTable(table, INPUT_FAMILY, false); 084 } 085 } 086 } 087 088 @AfterClass 089 public static void tearDownAfterClass() throws Exception { 090 TEST_UTIL.shutdownMiniCluster(); 091 } 092 093 @After 094 public void tearDown() throws Exception { 095 Configuration c = TEST_UTIL.getConfiguration(); 096 FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir"))); 097 } 098 099 /** 100 * Pass the key and value to reducer. 101 */ 102 public static class ScanMapper 103 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { 104 /** 105 * Pass the key and value to reduce. 106 * @param key The key, here "aaa", "aab" etc. 107 * @param value The value is the same as the key. 108 * @param context The task context. 109 * @throws IOException When reading the rows fails. 110 */ 111 @Override 112 public void map(ImmutableBytesWritable key, Result value, Context context) 113 throws IOException, InterruptedException { 114 makeAssertions(key, value); 115 context.write(key, key); 116 } 117 118 public void makeAssertions(ImmutableBytesWritable key, Result value) throws IOException { 119 if (value.size() != 1) { 120 throw new IOException("There should only be one input column"); 121 } 122 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf = value.getMap(); 123 if (!cf.containsKey(INPUT_FAMILY)) { 124 throw new IOException( 125 "Wrong input columns. Missing: '" + Bytes.toString(INPUT_FAMILY) + "'."); 126 } 127 String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null)); 128 LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) + ", value -> " + val); 129 } 130 } 131 132 /** 133 * Checks the last and first keys seen against the scanner boundaries. 134 */ 135 public static class ScanReducer 136 extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, NullWritable, NullWritable> { 137 private String first = null; 138 private String last = null; 139 140 @Override 141 protected void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, 142 Context context) throws IOException, InterruptedException { 143 makeAssertions(key, values); 144 } 145 146 protected void makeAssertions(ImmutableBytesWritable key, 147 Iterable<ImmutableBytesWritable> values) { 148 int count = 0; 149 for (ImmutableBytesWritable value : values) { 150 String val = Bytes.toStringBinary(value.get()); 151 LOG.debug( 152 "reduce: key[" + count + "] -> " + Bytes.toStringBinary(key.get()) + ", value -> " + val); 153 if (first == null) first = val; 154 last = val; 155 count++; 156 } 157 assertEquals(3, count); 158 } 159 160 @Override 161 protected void cleanup(Context context) throws IOException, InterruptedException { 162 Configuration c = context.getConfiguration(); 163 cleanup(c); 164 } 165 166 protected void cleanup(Configuration c) { 167 String startRow = c.get(KEY_STARTROW); 168 String lastRow = c.get(KEY_LASTROW); 169 LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\""); 170 LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\""); 171 if (startRow != null && startRow.length() > 0) { 172 assertEquals(startRow, first); 173 } 174 if (lastRow != null && lastRow.length() > 0) { 175 assertEquals(lastRow, last); 176 } 177 } 178 } 179 180 @Test 181 public void testScanEmptyToEmpty() 182 throws IOException, InterruptedException, ClassNotFoundException { 183 testScan(null, null, null); 184 } 185 186 @Test 187 public void testScanEmptyToAPP() 188 throws IOException, InterruptedException, ClassNotFoundException { 189 testScan(null, "app", "apo"); 190 } 191 192 @Test 193 public void testScanOBBToOPP() throws IOException, InterruptedException, ClassNotFoundException { 194 testScan("obb", "opp", "opo"); 195 } 196 197 @Test 198 public void testScanYZYToEmpty() 199 throws IOException, InterruptedException, ClassNotFoundException { 200 testScan("yzy", null, "zzz"); 201 } 202 203 /** 204 * Tests a MR scan using specific start and stop rows. 205 */ 206 private void testScan(String start, String stop, String last) 207 throws IOException, InterruptedException, ClassNotFoundException { 208 String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + "To" 209 + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); 210 LOG.info("Before map/reduce startup - job " + jobName); 211 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 212 213 c.set(KEY_STARTROW, start != null ? start : ""); 214 c.set(KEY_LASTROW, last != null ? last : ""); 215 216 List<Scan> scans = new ArrayList<>(); 217 218 for (String tableName : TABLES) { 219 Scan scan = new Scan(); 220 221 scan.addFamily(INPUT_FAMILY); 222 scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName)); 223 224 if (start != null) { 225 scan.withStartRow(Bytes.toBytes(start)); 226 } 227 if (stop != null) { 228 scan.withStopRow(Bytes.toBytes(stop)); 229 } 230 231 scans.add(scan); 232 233 LOG.info("scan before: " + scan); 234 } 235 236 runJob(jobName, c, scans); 237 } 238 239 protected void runJob(String jobName, Configuration c, List<Scan> scans) 240 throws IOException, InterruptedException, ClassNotFoundException { 241 Job job = new Job(c, jobName); 242 243 initJob(scans, job); 244 job.setReducerClass(ScanReducer.class); 245 job.setNumReduceTasks(1); // one to get final "first" and "last" key 246 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); 247 LOG.info("Started " + job.getJobName()); 248 job.waitForCompletion(true); 249 assertTrue(job.isSuccessful()); 250 LOG.info("After map/reduce completion - job " + jobName); 251 } 252 253 protected abstract void initJob(List<Scan> scans, Job job) throws IOException; 254 255}