001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.lang.reflect.Constructor; 022import java.lang.reflect.InvocationTargetException; 023import java.lang.reflect.Method; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.mapred.JobConf; 026import org.apache.hadoop.mapred.MiniMRCluster; 027import org.apache.hadoop.mapreduce.Job; 028import org.apache.hadoop.mapreduce.JobContext; 029import org.apache.hadoop.mapreduce.JobID; 030 031/** 032 * This class provides shims for HBase to interact with the Hadoop 1.0.x and the Hadoop 0.23.x 033 * series. NOTE: No testing done against 0.22.x, or 0.21.x. 034 */ 035abstract public class MapreduceTestingShim { 036 private static MapreduceTestingShim instance; 037 private static Class[] emptyParam = new Class[] {}; 038 039 static { 040 try { 041 // This class exists in hadoop 0.22+ but not in Hadoop 20.x/1.x 042 Class c = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); 043 instance = new MapreduceV2Shim(); 044 } catch (Exception e) { 045 instance = new MapreduceV1Shim(); 046 } 047 } 048 049 abstract public JobContext newJobContext(Configuration jobConf) throws IOException; 050 051 abstract public Job newJob(Configuration conf) throws IOException; 052 053 abstract public JobConf obtainJobConf(MiniMRCluster cluster); 054 055 abstract public String obtainMROutputDirProp(); 056 057 public static JobContext createJobContext(Configuration jobConf) throws IOException { 058 return instance.newJobContext(jobConf); 059 } 060 061 public static JobConf getJobConf(MiniMRCluster cluster) { 062 return instance.obtainJobConf(cluster); 063 } 064 065 public static Job createJob(Configuration conf) throws IOException { 066 return instance.newJob(conf); 067 } 068 069 public static String getMROutputDirProp() { 070 return instance.obtainMROutputDirProp(); 071 } 072 073 private static class MapreduceV1Shim extends MapreduceTestingShim { 074 @Override 075 public JobContext newJobContext(Configuration jobConf) throws IOException { 076 // Implementing: 077 // return new JobContext(jobConf, new JobID()); 078 JobID jobId = new JobID(); 079 Constructor<JobContext> c; 080 try { 081 c = JobContext.class.getConstructor(Configuration.class, JobID.class); 082 return c.newInstance(jobConf, jobId); 083 } catch (Exception e) { 084 throw new IllegalStateException( 085 "Failed to instantiate new JobContext(jobConf, new JobID())", e); 086 } 087 } 088 089 @Override 090 public Job newJob(Configuration conf) throws IOException { 091 // Implementing: 092 // return new Job(conf); 093 Constructor<Job> c; 094 try { 095 c = Job.class.getConstructor(Configuration.class); 096 return c.newInstance(conf); 097 } catch (Exception e) { 098 throw new IllegalStateException("Failed to instantiate new Job(conf)", e); 099 } 100 } 101 102 @Override 103 public JobConf obtainJobConf(MiniMRCluster cluster) { 104 if (cluster == null) return null; 105 try { 106 Object runner = cluster.getJobTrackerRunner(); 107 Method meth = runner.getClass().getDeclaredMethod("getJobTracker", emptyParam); 108 Object tracker = meth.invoke(runner, new Object[] {}); 109 Method m = tracker.getClass().getDeclaredMethod("getConf", emptyParam); 110 return (JobConf) m.invoke(tracker, new Object[] {}); 111 } catch (NoSuchMethodException nsme) { 112 return null; 113 } catch (InvocationTargetException ite) { 114 return null; 115 } catch (IllegalAccessException iae) { 116 return null; 117 } 118 } 119 120 @Override 121 public String obtainMROutputDirProp() { 122 return "mapred.output.dir"; 123 } 124 } 125 126 private static class MapreduceV2Shim extends MapreduceTestingShim { 127 @Override 128 public JobContext newJobContext(Configuration jobConf) { 129 return newJob(jobConf); 130 } 131 132 @Override 133 public Job newJob(Configuration jobConf) { 134 // Implementing: 135 // return Job.getInstance(jobConf); 136 try { 137 Method m = Job.class.getMethod("getInstance", Configuration.class); 138 return (Job) m.invoke(null, jobConf); // static method, then arg 139 } catch (Exception e) { 140 e.printStackTrace(); 141 throw new IllegalStateException("Failed to return from Job.getInstance(jobConf)"); 142 } 143 } 144 145 @Override 146 public JobConf obtainJobConf(MiniMRCluster cluster) { 147 try { 148 Method meth = MiniMRCluster.class.getMethod("getJobTrackerConf", emptyParam); 149 return (JobConf) meth.invoke(cluster, new Object[] {}); 150 } catch (NoSuchMethodException nsme) { 151 return null; 152 } catch (InvocationTargetException ite) { 153 return null; 154 } catch (IllegalAccessException iae) { 155 return null; 156 } 157 } 158 159 @Override 160 public String obtainMROutputDirProp() { 161 // This is a copy of o.a.h.mapreduce.lib.output.FileOutputFormat.OUTDIR 162 // from Hadoop 0.23.x. If we use the source directly we break the hadoop 1.x compile. 163 return "mapreduce.output.fileoutputformat.outputdir"; 164 } 165 } 166}