001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.client.Scan.SCAN_ATTRIBUTES_TABLE_NAME; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Map; 026import java.util.TreeMap; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.atomic.AtomicInteger; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.HRegionLocation; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.AsyncConnection; 038import org.apache.hadoop.hbase.client.BufferedMutator; 039import org.apache.hadoop.hbase.client.BufferedMutatorParams; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.ConnectionRegistry; 042import org.apache.hadoop.hbase.client.ConnectionUtils; 043import org.apache.hadoop.hbase.client.RegionInfoBuilder; 044import org.apache.hadoop.hbase.client.RegionLocator; 045import org.apache.hadoop.hbase.client.Result; 046import org.apache.hadoop.hbase.client.Scan; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.client.TableBuilder; 049import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 050import org.apache.hadoop.hbase.security.User; 051import org.apache.hadoop.hbase.testclassification.SmallTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.Pair; 054import org.apache.hadoop.mapreduce.InputSplit; 055import org.apache.hadoop.mapreduce.JobContext; 056import org.apache.hadoop.mapreduce.RecordReader; 057import org.apache.hadoop.mapreduce.TaskAttemptContext; 058import org.junit.Assert; 059import org.junit.ClassRule; 060import org.junit.Rule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.junit.rules.TestName; 064import org.mockito.Mockito; 065import org.mockito.invocation.InvocationOnMock; 066import org.mockito.stubbing.Answer; 067 068/** 069 * Tests of MultiTableInputFormatBase. 070 */ 071@Category({ SmallTests.class }) 072public class TestMultiTableInputFormatBase { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestMultiTableInputFormatBase.class); 077 078 @Rule 079 public final TestName name = new TestName(); 080 081 /** 082 * Test getSplits only puts up one Connection. In past it has put up many Connections. Each 083 * Connection setup comes with a fresh new cache so we have to do fresh hit on hbase:meta. Should 084 * only do one Connection when doing getSplits even if a MultiTableInputFormat. 085 */ 086 @Test 087 public void testMRSplitsConnectionCount() throws IOException { 088 // Make instance of MTIFB. 089 MultiTableInputFormatBase mtif = new MultiTableInputFormatBase() { 090 @Override 091 public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split, 092 TaskAttemptContext context) throws IOException, InterruptedException { 093 return super.createRecordReader(split, context); 094 } 095 }; 096 // Pass it a mocked JobContext. Make the JC return our Configuration. 097 // Load the Configuration so it returns our special Connection so we can interpolate 098 // canned responses. 099 JobContext mockedJobContext = Mockito.mock(JobContext.class); 100 Configuration c = HBaseConfiguration.create(); 101 c.set(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, MRSplitsConnection.class.getName()); 102 Mockito.when(mockedJobContext.getConfiguration()).thenReturn(c); 103 // Invent a bunch of scans. Have each Scan go against a different table so a good spread. 104 List<Scan> scans = new ArrayList<>(); 105 for (int i = 0; i < 10; i++) { 106 Scan scan = new Scan(); 107 String tableName = this.name.getMethodName() + i; 108 scan.setAttribute(SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName)); 109 scans.add(scan); 110 } 111 mtif.setScans(scans); 112 // Get splits. Assert that that more than one. 113 List<InputSplit> splits = mtif.getSplits(mockedJobContext); 114 Assert.assertTrue(splits.size() > 0); 115 // Assert only one Connection was made (see the static counter we have in the mocked 116 // Connection MRSplitsConnection Constructor. 117 Assert.assertEquals(1, MRSplitsConnection.creations.get()); 118 } 119 120 /** 121 * Connection to use above in Test. 122 */ 123 public static class MRSplitsConnection implements Connection { 124 private final Configuration configuration; 125 static final AtomicInteger creations = new AtomicInteger(0); 126 127 MRSplitsConnection(Configuration conf, ExecutorService pool, User user, 128 ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException { 129 this.configuration = conf; 130 creations.incrementAndGet(); 131 } 132 133 @Override 134 public void abort(String why, Throwable e) { 135 136 } 137 138 @Override 139 public boolean isAborted() { 140 return false; 141 } 142 143 @Override 144 public Configuration getConfiguration() { 145 return this.configuration; 146 } 147 148 @Override 149 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 150 return null; 151 } 152 153 @Override 154 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 155 return null; 156 } 157 158 @Override 159 public RegionLocator getRegionLocator(final TableName tableName) throws IOException { 160 // Make up array of start keys. We start off w/ empty byte array. 161 final byte[][] startKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaaa"), 162 Bytes.toBytes("bbb"), Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), 163 Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), 164 Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), 165 Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), Bytes.toBytes("sss"), 166 Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("zzz") }; 167 // Make an array of end keys. We end with the empty byte array. 168 final byte[][] endKeys = 169 new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"), 170 Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"), 171 Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), 172 Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), 173 Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), 174 Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), HConstants.EMPTY_BYTE_ARRAY }; 175 // Now make a map of start keys to HRegionLocations. Let the server namber derive from 176 // the start key. 177 final Map<byte[], HRegionLocation> map = 178 new TreeMap<byte[], HRegionLocation>(Bytes.BYTES_COMPARATOR); 179 for (byte[] startKey : startKeys) { 180 HRegionLocation hrl = 181 new HRegionLocation(RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).build(), 182 ServerName.valueOf(Bytes.toString(startKey), 0, 0)); 183 map.put(startKey, hrl); 184 } 185 // Get a list of the locations. 186 final List<HRegionLocation> locations = new ArrayList<HRegionLocation>(map.values()); 187 // Now make a RegionLocator mock backed by the abpve map and list of locations. 188 RegionLocator mockedRegionLocator = Mockito.mock(RegionLocator.class); 189 Mockito 190 .when( 191 mockedRegionLocator.getRegionLocation(Mockito.any(byte[].class), Mockito.anyBoolean())) 192 .thenAnswer(new Answer<HRegionLocation>() { 193 @Override 194 public HRegionLocation answer(InvocationOnMock invocationOnMock) throws Throwable { 195 Object[] args = invocationOnMock.getArguments(); 196 byte[] key = (byte[]) args[0]; 197 return map.get(key); 198 } 199 }); 200 Mockito.when(mockedRegionLocator.getAllRegionLocations()).thenReturn(locations); 201 Mockito.when(mockedRegionLocator.getStartEndKeys()) 202 .thenReturn(new Pair<byte[][], byte[][]>(startKeys, endKeys)); 203 Mockito.when(mockedRegionLocator.getName()).thenReturn(tableName); 204 return mockedRegionLocator; 205 } 206 207 @Override 208 public Admin getAdmin() throws IOException { 209 Admin admin = Mockito.mock(Admin.class); 210 Mockito.when(admin.getConfiguration()).thenReturn(getConfiguration()); 211 return admin; 212 } 213 214 @Override 215 public Table getTable(TableName tableName) throws IOException { 216 Table table = Mockito.mock(Table.class); 217 Mockito.when(table.getName()).thenReturn(tableName); 218 return table; 219 } 220 221 @Override 222 public void close() throws IOException { 223 224 } 225 226 @Override 227 public boolean isClosed() { 228 return false; 229 } 230 231 @Override 232 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 233 return Mockito.mock(TableBuilder.class); 234 } 235 236 @Override 237 public void clearRegionLocationCache() { 238 } 239 240 @Override 241 public AsyncConnection toAsyncConnection() { 242 return null; 243 } 244 245 @Override 246 public String getClusterId() { 247 return null; 248 } 249 250 } 251}