001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import java.io.IOException; 021import org.apache.hadoop.fs.FileAlreadyExistsException; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.hadoop.hbase.TableName; 024import org.apache.hadoop.hbase.client.BufferedMutator; 025import org.apache.hadoop.hbase.client.Connection; 026import org.apache.hadoop.hbase.client.ConnectionFactory; 027import org.apache.hadoop.hbase.client.Put; 028import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 029import org.apache.hadoop.mapred.FileOutputFormat; 030import org.apache.hadoop.mapred.InvalidJobConfException; 031import org.apache.hadoop.mapred.JobConf; 032import org.apache.hadoop.mapred.RecordWriter; 033import org.apache.hadoop.mapred.Reporter; 034import org.apache.hadoop.util.Progressable; 035import org.apache.yetus.audience.InterfaceAudience; 036 037/** 038 * Convert Map/Reduce output and write it to an HBase table 039 */ 040@InterfaceAudience.Public 041public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> { 042 043 /** JobConf parameter that specifies the output table */ 044 public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; 045 046 /** 047 * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) and write to an HBase 048 * table. 049 */ 050 protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> { 051 private BufferedMutator m_mutator; 052 private Connection conn; 053 054 /** 055 * Instantiate a TableRecordWriter with the HBase HClient for writing. 056 * @deprecated since 2.0.0 and will be removed in 3.0.0. Please use 057 * {@code #TableRecordWriter(JobConf)} instead. This version does not clean up 058 * connections and will leak connections (removed in 2.0). 059 * @see <a href="https://issues.apache.org/jira/browse/HBASE-16774">HBASE-16774</a> 060 */ 061 @Deprecated 062 public TableRecordWriter(final BufferedMutator mutator) throws IOException { 063 this.m_mutator = mutator; 064 this.conn = null; 065 } 066 067 /** 068 * Instantiate a TableRecordWriter with a BufferedMutator for batch writing. 069 */ 070 public TableRecordWriter(JobConf job) throws IOException { 071 // expecting exactly one path 072 TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE)); 073 try { 074 this.conn = ConnectionFactory.createConnection(job); 075 this.m_mutator = conn.getBufferedMutator(tableName); 076 } finally { 077 if (this.m_mutator == null) { 078 conn.close(); 079 conn = null; 080 } 081 } 082 } 083 084 public void close(Reporter reporter) throws IOException { 085 try { 086 if (this.m_mutator != null) { 087 this.m_mutator.close(); 088 } 089 } finally { 090 if (conn != null) { 091 this.conn.close(); 092 } 093 } 094 } 095 096 public void write(ImmutableBytesWritable key, Put value) throws IOException { 097 m_mutator.mutate(new Put(value)); 098 } 099 } 100 101 /** 102 * Creates a new record writer. Be aware that the baseline javadoc gives the impression that there 103 * is a single {@link RecordWriter} per job but in HBase, it is more natural if we give you a new 104 * RecordWriter per call of this method. You must close the returned RecordWriter when done. 105 * Failure to do so will drop writes. 106 * @param ignored Ignored filesystem 107 * @param job Current JobConf 108 * @param name Name of the job 109 * @return The newly created writer instance. 110 * @throws IOException When creating the writer fails. 111 */ 112 @Override 113 public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, 114 Progressable progress) throws IOException { 115 // Clear write buffer on fail is true by default so no need to reset it. 116 return new TableRecordWriter(job); 117 } 118 119 @Override 120 public void checkOutputSpecs(FileSystem ignored, JobConf job) 121 throws FileAlreadyExistsException, InvalidJobConfException, IOException { 122 String tableName = job.get(OUTPUT_TABLE); 123 if (tableName == null) { 124 throw new IOException("Must specify table name"); 125 } 126 } 127}