001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertTrue;
021import static org.junit.Assert.fail;
022
023import java.io.File;
024import java.io.IOException;
025import java.util.Iterator;
026import java.util.Map;
027import java.util.NavigableMap;
028import org.apache.hadoop.fs.FileUtil;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.Result;
038import org.apache.hadoop.hbase.client.ResultScanner;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.testclassification.MapReduceTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.mapreduce.Job;
046import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
047import org.junit.AfterClass;
048import org.junit.BeforeClass;
049import org.junit.ClassRule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Test Map/Reduce job over HBase tables. The map/reduce process we're testing on our tables is
057 * simple - take every row in the table, reverse the value of a particular cell, and write it back
058 * to the table.
059 */
060@Category({ MapReduceTests.class, LargeTests.class })
061public class TestMultithreadedTableMapper {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065    HBaseClassTestRule.forClass(TestMultithreadedTableMapper.class);
066
067  private static final Logger LOG = LoggerFactory.getLogger(TestMultithreadedTableMapper.class);
068  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
069  static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest");
070  static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
071  static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
072  static final int NUMBER_OF_THREADS = 10;
073
074  @BeforeClass
075  public static void beforeClass() throws Exception {
076    // Up the handlers; this test needs more than usual.
077    UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
078    UTIL.startMiniCluster();
079    Table table = UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME,
080      new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY });
081    UTIL.loadTable(table, INPUT_FAMILY, false);
082    UTIL.waitUntilAllRegionsAssigned(MULTI_REGION_TABLE_NAME);
083  }
084
085  @AfterClass
086  public static void afterClass() throws Exception {
087    UTIL.shutdownMiniCluster();
088  }
089
090  /**
091   * Pass the given key and processed record reduce
092   */
093  public static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> {
094
095    /**
096     * Pass the key, and reversed value to reduce
097     */
098    @Override
099    public void map(ImmutableBytesWritable key, Result value, Context context)
100      throws IOException, InterruptedException {
101      if (value.size() != 1) {
102        throw new IOException("There should only be one input column");
103      }
104      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf = value.getMap();
105      if (!cf.containsKey(INPUT_FAMILY)) {
106        throw new IOException(
107          "Wrong input columns. Missing: '" + Bytes.toString(INPUT_FAMILY) + "'.");
108      }
109      // Get the original value and reverse it
110      String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
111      StringBuilder newValue = new StringBuilder(originalValue);
112      newValue.reverse();
113      // Now set the value to be collected
114      Put outval = new Put(key.get());
115      outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
116      context.write(key, outval);
117    }
118  }
119
120  /**
121   * Test multithreadedTableMappper map/reduce against a multi-region table
122   */
123  @Test
124  public void testMultithreadedTableMapper()
125    throws IOException, InterruptedException, ClassNotFoundException {
126    runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME));
127  }
128
129  private void runTestOnTable(Table table)
130    throws IOException, InterruptedException, ClassNotFoundException {
131    Job job = null;
132    try {
133      LOG.info("Before map/reduce startup");
134      job = new Job(table.getConfiguration(), "process column contents");
135      job.setNumReduceTasks(1);
136      Scan scan = new Scan();
137      scan.addFamily(INPUT_FAMILY);
138      TableMapReduceUtil.initTableMapperJob(table.getName(), scan, MultithreadedTableMapper.class,
139        ImmutableBytesWritable.class, Put.class, job);
140      MultithreadedTableMapper.setMapperClass(job, ProcessContentsMapper.class);
141      MultithreadedTableMapper.setNumberOfThreads(job, NUMBER_OF_THREADS);
142      TableMapReduceUtil.initTableReducerJob(table.getName().getNameAsString(),
143        IdentityTableReducer.class, job);
144      FileOutputFormat.setOutputPath(job, new Path("test"));
145      LOG.info("Started " + table.getName());
146      assertTrue(job.waitForCompletion(true));
147      LOG.info("After map/reduce completion");
148      // verify map-reduce results
149      verify(table.getName());
150    } finally {
151      table.close();
152      if (job != null) {
153        FileUtil.fullyDelete(new File(job.getConfiguration().get("hadoop.tmp.dir")));
154      }
155    }
156  }
157
158  private void verify(TableName tableName) throws IOException {
159    Table table = UTIL.getConnection().getTable(tableName);
160    boolean verified = false;
161    long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
162    int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
163    for (int i = 0; i < numRetries; i++) {
164      try {
165        LOG.info("Verification attempt #" + i);
166        verifyAttempt(table);
167        verified = true;
168        break;
169      } catch (NullPointerException e) {
170        // If here, a cell was empty. Presume its because updates came in
171        // after the scanner had been opened. Wait a while and retry.
172        LOG.debug("Verification attempt failed: " + e.getMessage());
173      }
174      try {
175        Thread.sleep(pause);
176      } catch (InterruptedException e) {
177        // continue
178      }
179    }
180    assertTrue(verified);
181    table.close();
182  }
183
184  /**
185   * Looks at every value of the mapreduce output and verifies that indeed the values have been
186   * reversed.
187   * @param table Table to scan.
188   * @throws NullPointerException if we failed to find a cell value
189   */
190  private void verifyAttempt(final Table table) throws IOException, NullPointerException {
191    Scan scan = new Scan();
192    scan.addFamily(INPUT_FAMILY);
193    scan.addFamily(OUTPUT_FAMILY);
194    ResultScanner scanner = table.getScanner(scan);
195    try {
196      Iterator<Result> itr = scanner.iterator();
197      assertTrue(itr.hasNext());
198      while (itr.hasNext()) {
199        Result r = itr.next();
200        if (LOG.isDebugEnabled()) {
201          if (r.size() > 2) {
202            throw new IOException("Too many results, expected 2 got " + r.size());
203          }
204        }
205        byte[] firstValue = null;
206        byte[] secondValue = null;
207        int count = 0;
208        for (Cell kv : r.listCells()) {
209          if (count == 0) {
210            firstValue = CellUtil.cloneValue(kv);
211          } else if (count == 1) {
212            secondValue = CellUtil.cloneValue(kv);
213          } else if (count == 2) {
214            break;
215          }
216          count++;
217        }
218        String first = "";
219        if (firstValue == null) {
220          throw new NullPointerException(Bytes.toString(r.getRow()) + ": first value is null");
221        }
222        first = Bytes.toString(firstValue);
223        String second = "";
224        if (secondValue == null) {
225          throw new NullPointerException(Bytes.toString(r.getRow()) + ": second value is null");
226        }
227        byte[] secondReversed = new byte[secondValue.length];
228        for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
229          secondReversed[i] = secondValue[j];
230        }
231        second = Bytes.toString(secondReversed);
232        if (first.compareTo(second) != 0) {
233          if (LOG.isDebugEnabled()) {
234            LOG.debug(
235              "second key is not the reverse of first. row=" + Bytes.toStringBinary(r.getRow())
236                + ", first value=" + first + ", second value=" + second);
237          }
238          fail();
239        }
240      }
241    } finally {
242      scanner.close();
243    }
244  }
245
246}