001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.Iterator; 022import java.util.List; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.conf.Configured; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HBaseConfiguration; 027import org.apache.hadoop.hbase.client.Put; 028import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.hadoop.io.LongWritable; 031import org.apache.hadoop.io.Text; 032import org.apache.hadoop.mapreduce.Job; 033import org.apache.hadoop.mapreduce.Mapper; 034import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 035import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 036import org.apache.hadoop.util.Tool; 037import org.apache.hadoop.util.ToolRunner; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 043 044/** 045 * Sample Uploader MapReduce 046 * <p> 047 * This is EXAMPLE code. You will need to change it to work for your context. 048 * <p> 049 * Uses {@link TableReducer} to put the data into HBase. Change the InputFormat to suit your data. 050 * In this example, we are importing a CSV file. 051 * <p> 052 * 053 * <pre> 054 * row,family,qualifier,value 055 * </pre> 056 * <p> 057 * The table and columnfamily we're to insert into must preexist. 058 * <p> 059 * There is no reducer in this example as it is not necessary and adds significant overhead. If you 060 * need to do any massaging of data before inserting into HBase, you can do this in the map as well. 061 * <p> 062 * Do the following to start the MR job: 063 * 064 * <pre> 065 * ./bin/hadoop org.apache.hadoop.hbase.mapreduce.SampleUploader /tmp/input.csv TABLE_NAME 066 * </pre> 067 * <p> 068 * This code was written against HBase 0.21 trunk. 069 */ 070@InterfaceAudience.Private 071public class SampleUploader extends Configured implements Tool { 072 private static final Logger LOG = LoggerFactory.getLogger(SampleUploader.class); 073 074 private static final String NAME = "SampleUploader"; 075 076 static class Uploader extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { 077 private long checkpoint = 100; 078 private long count = 0; 079 080 @Override 081 public void map(LongWritable key, Text line, Context context) throws IOException { 082 // Input is a CSV file 083 // Each map() is a single line, where the key is the line number 084 // Each line is comma-delimited; row,family,qualifier,value 085 086 // Split CSV line 087 List<String> values = Splitter.on(',').splitToList(line.toString()); 088 if (values.size() != 4) { 089 return; 090 } 091 Iterator<String> i = values.iterator(); 092 // Extract each value 093 byte[] row = Bytes.toBytes(i.next()); 094 byte[] family = Bytes.toBytes(i.next()); 095 byte[] qualifier = Bytes.toBytes(i.next()); 096 byte[] value = Bytes.toBytes(i.next()); 097 098 // Create Put 099 Put put = new Put(row); 100 put.addColumn(family, qualifier, value); 101 102 // Uncomment below to disable WAL. This will improve performance but means 103 // you will experience data loss in the case of a RegionServer crash. 104 // put.setWriteToWAL(false); 105 106 try { 107 context.write(new ImmutableBytesWritable(row), put); 108 } catch (InterruptedException e) { 109 LOG.error("Interrupted emitting put", e); 110 Thread.currentThread().interrupt(); 111 } 112 113 // Set status every checkpoint lines 114 if (++count % checkpoint == 0) { 115 context.setStatus("Emitting Put " + count); 116 } 117 } 118 } 119 120 /** 121 * Job configuration. 122 */ 123 public static Job configureJob(Configuration conf, String[] args) throws IOException { 124 Path inputPath = new Path(args[0]); 125 String tableName = args[1]; 126 Job job = new Job(conf, NAME + "_" + tableName); 127 job.setJarByClass(Uploader.class); 128 FileInputFormat.setInputPaths(job, inputPath); 129 job.setInputFormatClass(SequenceFileInputFormat.class); 130 job.setMapperClass(Uploader.class); 131 // No reducers. Just write straight to table. Call initTableReducerJob 132 // because it sets up the TableOutputFormat. 133 TableMapReduceUtil.initTableReducerJob(tableName, null, job); 134 job.setNumReduceTasks(0); 135 return job; 136 } 137 138 /** 139 * Main entry point. 140 * @param otherArgs The command line parameters after ToolRunner handles standard. 141 * @throws Exception When running the job fails. 142 */ 143 @Override 144 public int run(String[] otherArgs) throws Exception { 145 if (otherArgs.length != 2) { 146 System.err.println("Wrong number of arguments: " + otherArgs.length); 147 System.err.println("Usage: " + NAME + " <input> <tablename>"); 148 return -1; 149 } 150 Job job = configureJob(getConf(), otherArgs); 151 return (job.waitForCompletion(true) ? 0 : 1); 152 } 153 154 public static void main(String[] args) throws Exception { 155 int status = ToolRunner.run(HBaseConfiguration.create(), new SampleUploader(), args); 156 System.exit(status); 157 } 158}