001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.File; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.NavigableMap; 026import java.util.TreeMap; 027import org.apache.hadoop.conf.Configurable; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileUtil; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.ConnectionFactory; 039import org.apache.hadoop.hbase.client.Durability; 040import org.apache.hadoop.hbase.client.Put; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.ResultScanner; 043import org.apache.hadoop.hbase.client.Scan; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.client.TableDescriptor; 046import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 047import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 048import org.apache.hadoop.hbase.testclassification.LargeTests; 049import org.apache.hadoop.hbase.testclassification.MapReduceTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.io.MapWritable; 052import org.apache.hadoop.io.Text; 053import org.apache.hadoop.mapreduce.Job; 054import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 055import org.junit.AfterClass; 056import org.junit.Before; 057import org.junit.BeforeClass; 058import org.junit.ClassRule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064@Category({ MapReduceTests.class, LargeTests.class }) 065public class TestTimeRangeMapRed { 066 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestTimeRangeMapRed.class); 070 071 private final static Logger log = LoggerFactory.getLogger(TestTimeRangeMapRed.class); 072 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 073 private Admin admin; 074 075 private static final byte[] KEY = Bytes.toBytes("row1"); 076 private static final NavigableMap<Long, Boolean> TIMESTAMP = new TreeMap<>(); 077 static { 078 TIMESTAMP.put((long) 1245620000, false); 079 TIMESTAMP.put((long) 1245620005, true); // include 080 TIMESTAMP.put((long) 1245620010, true); // include 081 TIMESTAMP.put((long) 1245620055, true); // include 082 TIMESTAMP.put((long) 1245620100, true); // include 083 TIMESTAMP.put((long) 1245620150, false); 084 TIMESTAMP.put((long) 1245620250, false); 085 } 086 static final long MINSTAMP = 1245620005; 087 static final long MAXSTAMP = 1245620100 + 1; // maxStamp itself is excluded. so increment it. 088 089 static final TableName TABLE_NAME = TableName.valueOf("table123"); 090 static final byte[] FAMILY_NAME = Bytes.toBytes("text"); 091 static final byte[] COLUMN_NAME = Bytes.toBytes("input"); 092 093 @BeforeClass 094 public static void beforeClass() throws Exception { 095 UTIL.startMiniCluster(); 096 } 097 098 @AfterClass 099 public static void afterClass() throws Exception { 100 UTIL.shutdownMiniCluster(); 101 } 102 103 @Before 104 public void before() throws Exception { 105 this.admin = UTIL.getAdmin(); 106 } 107 108 private static class ProcessTimeRangeMapper 109 extends TableMapper<ImmutableBytesWritable, MapWritable> implements Configurable { 110 111 private Configuration conf = null; 112 private Table table = null; 113 114 @Override 115 public void map(ImmutableBytesWritable key, Result result, Context context) throws IOException { 116 List<Long> tsList = new ArrayList<>(); 117 for (Cell kv : result.listCells()) { 118 tsList.add(kv.getTimestamp()); 119 } 120 121 List<Put> puts = new ArrayList<>(); 122 for (Long ts : tsList) { 123 Put put = new Put(key.get()); 124 put.setDurability(Durability.SKIP_WAL); 125 put.addColumn(FAMILY_NAME, COLUMN_NAME, ts, Bytes.toBytes(true)); 126 puts.add(put); 127 } 128 table.put(puts); 129 } 130 131 @Override 132 public Configuration getConf() { 133 return conf; 134 } 135 136 @Override 137 public void setConf(Configuration configuration) { 138 this.conf = configuration; 139 try { 140 Connection connection = ConnectionFactory.createConnection(conf); 141 table = connection.getTable(TABLE_NAME); 142 } catch (IOException e) { 143 e.printStackTrace(); 144 } 145 } 146 } 147 148 @Test 149 public void testTimeRangeMapRed() 150 throws IOException, InterruptedException, ClassNotFoundException { 151 final TableDescriptor tableDescriptor = 152 TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder 153 .newBuilder(FAMILY_NAME).setMaxVersions(Integer.MAX_VALUE).build()).build(); 154 admin.createTable(tableDescriptor); 155 List<Put> puts = new ArrayList<>(); 156 for (Map.Entry<Long, Boolean> entry : TIMESTAMP.entrySet()) { 157 Put put = new Put(KEY); 158 put.setDurability(Durability.SKIP_WAL); 159 put.addColumn(FAMILY_NAME, COLUMN_NAME, entry.getKey(), Bytes.toBytes(false)); 160 puts.add(put); 161 } 162 Table table = UTIL.getConnection().getTable(tableDescriptor.getTableName()); 163 table.put(puts); 164 runTestOnTable(); 165 verify(table); 166 table.close(); 167 } 168 169 private void runTestOnTable() throws IOException, InterruptedException, ClassNotFoundException { 170 Job job = null; 171 try { 172 job = new Job(UTIL.getConfiguration(), "test123"); 173 job.setOutputFormatClass(NullOutputFormat.class); 174 job.setNumReduceTasks(0); 175 Scan scan = new Scan(); 176 scan.addColumn(FAMILY_NAME, COLUMN_NAME); 177 scan.setTimeRange(MINSTAMP, MAXSTAMP); 178 scan.readAllVersions(); 179 TableMapReduceUtil.initTableMapperJob(TABLE_NAME, scan, ProcessTimeRangeMapper.class, 180 Text.class, Text.class, job); 181 job.waitForCompletion(true); 182 } catch (IOException e) { 183 // TODO Auto-generated catch block 184 e.printStackTrace(); 185 } finally { 186 if (job != null) { 187 FileUtil.fullyDelete(new File(job.getConfiguration().get("hadoop.tmp.dir"))); 188 } 189 } 190 } 191 192 private void verify(final Table table) throws IOException { 193 Scan scan = new Scan(); 194 scan.addColumn(FAMILY_NAME, COLUMN_NAME); 195 scan.readVersions(1); 196 ResultScanner scanner = table.getScanner(scan); 197 for (Result r : scanner) { 198 for (Cell kv : r.listCells()) { 199 log.debug(Bytes.toString(r.getRow()) + "\t" + Bytes.toString(CellUtil.cloneFamily(kv)) 200 + "\t" + Bytes.toString(CellUtil.cloneQualifier(kv)) + "\t" + kv.getTimestamp() + "\t" 201 + Bytes.toBoolean(CellUtil.cloneValue(kv))); 202 org.junit.Assert.assertEquals(TIMESTAMP.get(kv.getTimestamp()), 203 Bytes.toBoolean(CellUtil.cloneValue(kv))); 204 } 205 } 206 scanner.close(); 207 } 208 209}