001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.File;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Set;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileUtil;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.testclassification.MapReduceTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.mapred.JobClient;
042import org.apache.hadoop.mapred.JobConf;
043import org.apache.hadoop.mapred.MapReduceBase;
044import org.apache.hadoop.mapred.OutputCollector;
045import org.apache.hadoop.mapred.Reporter;
046import org.apache.hadoop.mapred.RunningJob;
047import org.junit.AfterClass;
048import org.junit.Assert;
049import org.junit.Before;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
058import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
059
060@Category({ MapReduceTests.class, LargeTests.class })
061public class TestTableMapReduceUtil {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065    HBaseClassTestRule.forClass(TestTableMapReduceUtil.class);
066
067  private static final Logger LOG = LoggerFactory.getLogger(TestTableMapReduceUtil.class);
068
069  private static Table presidentsTable;
070  private static final String TABLE_NAME = "People";
071
072  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info");
073  private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("name");
074
075  private static ImmutableSet<String> presidentsRowKeys =
076    ImmutableSet.of("president1", "president2", "president3");
077  private static Iterator<String> presidentNames =
078    ImmutableSet.of("John F. Kennedy", "George W. Bush", "Barack Obama").iterator();
079
080  private static ImmutableSet<String> actorsRowKeys = ImmutableSet.of("actor1", "actor2");
081  private static Iterator<String> actorNames =
082    ImmutableSet.of("Jack Nicholson", "Martin Freeman").iterator();
083
084  private static String PRESIDENT_PATTERN = "president";
085  private static String ACTOR_PATTERN = "actor";
086  private static ImmutableMap<String, ImmutableSet<String>> relation =
087    ImmutableMap.of(PRESIDENT_PATTERN, presidentsRowKeys, ACTOR_PATTERN, actorsRowKeys);
088
089  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
090
091  @BeforeClass
092  public static void beforeClass() throws Exception {
093    UTIL.startMiniCluster();
094    presidentsTable = createAndFillTable(TableName.valueOf(TABLE_NAME));
095  }
096
097  @AfterClass
098  public static void afterClass() throws Exception {
099    UTIL.shutdownMiniCluster();
100  }
101
102  @Before
103  public void before() throws IOException {
104    LOG.info("before");
105    UTIL.ensureSomeRegionServersAvailable(1);
106    LOG.info("before done");
107  }
108
109  public static Table createAndFillTable(TableName tableName) throws IOException {
110    Table table = UTIL.createTable(tableName, COLUMN_FAMILY);
111    createPutCommand(table);
112    return table;
113  }
114
115  private static void createPutCommand(Table table) throws IOException {
116    for (String president : presidentsRowKeys) {
117      if (presidentNames.hasNext()) {
118        Put p = new Put(Bytes.toBytes(president));
119        p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(presidentNames.next()));
120        table.put(p);
121      }
122    }
123
124    for (String actor : actorsRowKeys) {
125      if (actorNames.hasNext()) {
126        Put p = new Put(Bytes.toBytes(actor));
127        p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(actorNames.next()));
128        table.put(p);
129      }
130    }
131  }
132
133  /**
134   * Check what the given number of reduce tasks for the given job configuration does not exceed the
135   * number of regions for the given table.
136   */
137  @Test
138  public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable() throws IOException {
139    Assert.assertNotNull(presidentsTable);
140    Configuration cfg = UTIL.getConfiguration();
141    JobConf jobConf = new JobConf(cfg);
142    TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
143    TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
144    TableMapReduceUtil.setScannerCaching(jobConf, 100);
145    assertEquals(1, jobConf.getNumReduceTasks());
146    assertEquals(100, jobConf.getInt("hbase.client.scanner.caching", 0));
147
148    jobConf.setNumReduceTasks(10);
149    TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
150    TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
151    assertEquals(1, jobConf.getNumReduceTasks());
152  }
153
154  @Test
155  public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable() throws IOException {
156    Configuration cfg = UTIL.getConfiguration();
157    JobConf jobConf = new JobConf(cfg);
158    TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
159    TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
160    assertEquals(1, jobConf.getNumMapTasks());
161
162    jobConf.setNumMapTasks(10);
163    TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
164    TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
165    assertEquals(1, jobConf.getNumMapTasks());
166  }
167
168  @Test
169  @SuppressWarnings("deprecation")
170  public void shoudBeValidMapReduceEvaluation() throws Exception {
171    Configuration cfg = UTIL.getConfiguration();
172    JobConf jobConf = new JobConf(cfg);
173    try {
174      jobConf.setJobName("process row task");
175      jobConf.setNumReduceTasks(1);
176      TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
177        ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class, jobConf);
178      TableMapReduceUtil.initTableReduceJob(TABLE_NAME, ClassificatorRowReduce.class, jobConf);
179      RunningJob job = JobClient.runJob(jobConf);
180      assertTrue(job.isSuccessful());
181    } finally {
182      if (jobConf != null) FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
183    }
184  }
185
186  @Test
187  @SuppressWarnings("deprecation")
188  public void shoudBeValidMapReduceWithPartitionerEvaluation() throws IOException {
189    Configuration cfg = UTIL.getConfiguration();
190    JobConf jobConf = new JobConf(cfg);
191    try {
192      jobConf.setJobName("process row task");
193      jobConf.setNumReduceTasks(2);
194      TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
195        ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class, jobConf);
196
197      TableMapReduceUtil.initTableReduceJob(TABLE_NAME, ClassificatorRowReduce.class, jobConf,
198        HRegionPartitioner.class);
199      RunningJob job = JobClient.runJob(jobConf);
200      assertTrue(job.isSuccessful());
201    } finally {
202      if (jobConf != null) FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
203    }
204  }
205
206  @SuppressWarnings("deprecation")
207  static class ClassificatorRowReduce extends MapReduceBase
208    implements TableReduce<ImmutableBytesWritable, Put> {
209
210    @Override
211    public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
212      OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter) throws IOException {
213      String strKey = Bytes.toString(key.get());
214      List<Put> result = new ArrayList<>();
215      while (values.hasNext())
216        result.add(values.next());
217
218      if (relation.keySet().contains(strKey)) {
219        Set<String> set = relation.get(strKey);
220        if (set != null) {
221          assertEquals(set.size(), result.size());
222        } else {
223          throwAccertionError("Test infrastructure error: set is null");
224        }
225      } else {
226        throwAccertionError("Test infrastructure error: key not found in map");
227      }
228    }
229
230    private void throwAccertionError(String errorMessage) throws AssertionError {
231      throw new AssertionError(errorMessage);
232    }
233  }
234
235  @SuppressWarnings("deprecation")
236  static class ClassificatorMapper extends MapReduceBase
237    implements TableMap<ImmutableBytesWritable, Put> {
238
239    @Override
240    public void map(ImmutableBytesWritable row, Result result,
241      OutputCollector<ImmutableBytesWritable, Put> outCollector, Reporter reporter)
242      throws IOException {
243      String rowKey = Bytes.toString(result.getRow());
244      final ImmutableBytesWritable pKey =
245        new ImmutableBytesWritable(Bytes.toBytes(PRESIDENT_PATTERN));
246      final ImmutableBytesWritable aKey = new ImmutableBytesWritable(Bytes.toBytes(ACTOR_PATTERN));
247      ImmutableBytesWritable outKey = null;
248
249      if (rowKey.startsWith(PRESIDENT_PATTERN)) {
250        outKey = pKey;
251      } else if (rowKey.startsWith(ACTOR_PATTERN)) {
252        outKey = aKey;
253      } else {
254        throw new AssertionError("unexpected rowKey");
255      }
256
257      String name = Bytes.toString(result.getValue(COLUMN_FAMILY, COLUMN_QUALIFIER));
258      outCollector.collect(outKey, new Put(Bytes.toBytes("rowKey2")).addColumn(COLUMN_FAMILY,
259        COLUMN_QUALIFIER, Bytes.toBytes(name)));
260    }
261  }
262}