001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.File;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Locale;
028import java.util.Map;
029import java.util.NavigableMap;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileUtil;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.io.NullWritable;
041import org.apache.hadoop.mapreduce.Job;
042import org.apache.hadoop.mapreduce.Reducer;
043import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
044import org.junit.After;
045import org.junit.AfterClass;
046import org.junit.BeforeClass;
047import org.junit.Test;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
052
053/**
054 * Base set of tests and setup for input formats touching multiple tables.
055 */
056public abstract class MultiTableInputFormatTestBase {
057  static final Logger LOG = LoggerFactory.getLogger(TestMultiTableInputFormat.class);
058  public static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
059  static final String TABLE_NAME = "scantest";
060  static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
061  static final String KEY_STARTROW = "startRow";
062  static final String KEY_LASTROW = "stpRow";
063
064  static List<String> TABLES = Lists.newArrayList();
065
066  static {
067    for (int i = 0; i < 3; i++) {
068      TABLES.add(TABLE_NAME + String.valueOf(i));
069    }
070  }
071
072  @BeforeClass
073  public static void setUpBeforeClass() throws Exception {
074    // switch TIF to log at DEBUG level
075    TEST_UTIL.enableDebug(MultiTableInputFormatBase.class);
076    // start mini hbase cluster
077    TEST_UTIL.startMiniCluster(3);
078    // create and fill table
079    for (String tableName : TABLES) {
080      try (Table table =
081        TEST_UTIL.createMultiRegionTable(TableName.valueOf(tableName), INPUT_FAMILY, 4)) {
082        TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
083      }
084    }
085  }
086
087  @AfterClass
088  public static void tearDownAfterClass() throws Exception {
089    TEST_UTIL.shutdownMiniCluster();
090  }
091
092  @After
093  public void tearDown() throws Exception {
094    Configuration c = TEST_UTIL.getConfiguration();
095    FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
096  }
097
098  /**
099   * Pass the key and value to reducer.
100   */
101  public static class ScanMapper
102    extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
103    /**
104     * Pass the key and value to reduce.
105     * @param key     The key, here "aaa", "aab" etc.
106     * @param value   The value is the same as the key.
107     * @param context The task context.
108     * @throws IOException When reading the rows fails.
109     */
110    @Override
111    public void map(ImmutableBytesWritable key, Result value, Context context)
112      throws IOException, InterruptedException {
113      makeAssertions(key, value);
114      context.write(key, key);
115    }
116
117    public void makeAssertions(ImmutableBytesWritable key, Result value) throws IOException {
118      if (value.size() != 1) {
119        throw new IOException("There should only be one input column");
120      }
121      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf = value.getMap();
122      if (!cf.containsKey(INPUT_FAMILY)) {
123        throw new IOException(
124          "Wrong input columns. Missing: '" + Bytes.toString(INPUT_FAMILY) + "'.");
125      }
126      String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
127      LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) + ", value -> " + val);
128    }
129  }
130
131  /**
132   * Checks the last and first keys seen against the scanner boundaries.
133   */
134  public static class ScanReducer
135    extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, NullWritable, NullWritable> {
136    private String first = null;
137    private String last = null;
138
139    @Override
140    protected void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values,
141      Context context) throws IOException, InterruptedException {
142      makeAssertions(key, values);
143    }
144
145    protected void makeAssertions(ImmutableBytesWritable key,
146      Iterable<ImmutableBytesWritable> values) {
147      int count = 0;
148      for (ImmutableBytesWritable value : values) {
149        String val = Bytes.toStringBinary(value.get());
150        LOG.debug(
151          "reduce: key[" + count + "] -> " + Bytes.toStringBinary(key.get()) + ", value -> " + val);
152        if (first == null) first = val;
153        last = val;
154        count++;
155      }
156      assertEquals(3, count);
157    }
158
159    @Override
160    protected void cleanup(Context context) throws IOException, InterruptedException {
161      Configuration c = context.getConfiguration();
162      cleanup(c);
163    }
164
165    protected void cleanup(Configuration c) {
166      String startRow = c.get(KEY_STARTROW);
167      String lastRow = c.get(KEY_LASTROW);
168      LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
169      LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
170      if (startRow != null && startRow.length() > 0) {
171        assertEquals(startRow, first);
172      }
173      if (lastRow != null && lastRow.length() > 0) {
174        assertEquals(lastRow, last);
175      }
176    }
177  }
178
179  @Test
180  public void testScanEmptyToEmpty()
181    throws IOException, InterruptedException, ClassNotFoundException {
182    testScan(null, null, null);
183  }
184
185  @Test
186  public void testScanEmptyToAPP()
187    throws IOException, InterruptedException, ClassNotFoundException {
188    testScan(null, "app", "apo");
189  }
190
191  @Test
192  public void testScanOBBToOPP() throws IOException, InterruptedException, ClassNotFoundException {
193    testScan("obb", "opp", "opo");
194  }
195
196  @Test
197  public void testScanYZYToEmpty()
198    throws IOException, InterruptedException, ClassNotFoundException {
199    testScan("yzy", null, "zzz");
200  }
201
202  /**
203   * Tests a MR scan using specific start and stop rows.
204   */
205  private void testScan(String start, String stop, String last)
206    throws IOException, InterruptedException, ClassNotFoundException {
207    String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + "To"
208      + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
209    LOG.info("Before map/reduce startup - job " + jobName);
210    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
211
212    c.set(KEY_STARTROW, start != null ? start : "");
213    c.set(KEY_LASTROW, last != null ? last : "");
214
215    List<Scan> scans = new ArrayList<>();
216
217    for (String tableName : TABLES) {
218      Scan scan = new Scan();
219
220      scan.addFamily(INPUT_FAMILY);
221      scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName));
222
223      if (start != null) {
224        scan.setStartRow(Bytes.toBytes(start));
225      }
226      if (stop != null) {
227        scan.setStopRow(Bytes.toBytes(stop));
228      }
229
230      scans.add(scan);
231
232      LOG.info("scan before: " + scan);
233    }
234
235    runJob(jobName, c, scans);
236  }
237
238  protected void runJob(String jobName, Configuration c, List<Scan> scans)
239    throws IOException, InterruptedException, ClassNotFoundException {
240    Job job = new Job(c, jobName);
241
242    initJob(scans, job);
243    job.setReducerClass(ScanReducer.class);
244    job.setNumReduceTasks(1); // one to get final "first" and "last" key
245    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
246    LOG.info("Started " + job.getJobName());
247    job.waitForCompletion(true);
248    assertTrue(job.isSuccessful());
249    LOG.info("After map/reduce completion - job " + jobName);
250  }
251
252  protected abstract void initJob(List<Scan> scans, Job job) throws IOException;
253
254}