001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.*; 021import static org.mockito.Mockito.any; 022import static org.mockito.Mockito.anyBoolean; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.when; 025 026import java.io.IOException; 027import java.net.Inet6Address; 028import java.net.InetAddress; 029import java.net.UnknownHostException; 030import java.util.Map; 031import java.util.TreeMap; 032import java.util.concurrent.ExecutorService; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.HRegionLocation; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.BufferedMutator; 042import org.apache.hadoop.hbase.client.BufferedMutatorParams; 043import org.apache.hadoop.hbase.client.ClusterConnection; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.RegionInfo; 046import org.apache.hadoop.hbase.client.RegionInfoBuilder; 047import org.apache.hadoop.hbase.client.RegionLocator; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.client.TableBuilder; 050import org.apache.hadoop.hbase.security.User; 051import org.apache.hadoop.hbase.testclassification.SmallTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.Pair; 054import org.apache.hadoop.mapreduce.JobContext; 055import org.junit.ClassRule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.mockito.Mockito; 059import org.mockito.invocation.InvocationOnMock; 060import org.mockito.stubbing.Answer; 061 062@Category({ SmallTests.class }) 063public class TestTableInputFormatBase { 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestTableInputFormatBase.class); 068 069 @Test 070 public void testReuseRegionSizeCalculator() throws IOException { 071 JobContext context = mock(JobContext.class); 072 Configuration conf = HBaseConfiguration.create(); 073 conf.set(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, 074 ConnectionForMergeTesting.class.getName()); 075 conf.set(TableInputFormat.INPUT_TABLE, "testTable"); 076 conf.setBoolean(TableInputFormatBase.MAPREDUCE_INPUT_AUTOBALANCE, true); 077 when(context.getConfiguration()).thenReturn(conf); 078 079 TableInputFormat format = Mockito.spy(new TableInputFormatForMergeTesting()); 080 format.setConf(conf); 081 // initialize so that table is set, otherwise cloneOnFinish 082 // will be true and each getSplits call will re-initialize everything 083 format.initialize(context); 084 format.getSplits(context); 085 format.getSplits(context); 086 087 // re-initialize which will cause the next getSplits call to create a new RegionSizeCalculator 088 format.initialize(context); 089 format.getSplits(context); 090 format.getSplits(context); 091 092 // should only be 2 despite calling getSplits 4 times 093 Mockito.verify(format, Mockito.times(2)).createRegionSizeCalculator(Mockito.any(), 094 Mockito.any()); 095 } 096 097 @Test 098 public void testTableInputFormatBaseReverseDNSForIPv6() throws UnknownHostException { 099 String address = "ipv6.google.com"; 100 String localhost = null; 101 InetAddress addr = null; 102 TableInputFormat inputFormat = new TableInputFormat(); 103 try { 104 localhost = InetAddress.getByName(address).getCanonicalHostName(); 105 addr = Inet6Address.getByName(address); 106 } catch (UnknownHostException e) { 107 // google.com is down, we can probably forgive this test. 108 return; 109 } 110 System.out.println("Should retrun the hostname for this host " + localhost + " addr : " + addr); 111 String actualHostName = inputFormat.reverseDNS(addr); 112 assertEquals("Should retrun the hostname for this host. Expected : " + localhost + " Actual : " 113 + actualHostName, localhost, actualHostName); 114 } 115 116 @Test 117 public void testNonSuccessiveSplitsAreNotMerged() throws IOException { 118 JobContext context = mock(JobContext.class); 119 Configuration conf = HBaseConfiguration.create(); 120 conf.set(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, 121 ConnectionForMergeTesting.class.getName()); 122 conf.set(TableInputFormat.INPUT_TABLE, "testTable"); 123 conf.setBoolean(TableInputFormatBase.MAPREDUCE_INPUT_AUTOBALANCE, true); 124 when(context.getConfiguration()).thenReturn(conf); 125 126 TableInputFormat tifExclude = new TableInputFormatForMergeTesting(); 127 tifExclude.setConf(conf); 128 // split["b", "c"] is excluded, split["o", "p"] and split["p", "q"] are merged, 129 // but split["a", "b"] and split["c", "d"] are not merged. 130 assertEquals(ConnectionForMergeTesting.START_KEYS.length - 1 - 1, 131 tifExclude.getSplits(context).size()); 132 } 133 134 /** 135 * Subclass of {@link TableInputFormat} to use in {@link #testNonSuccessiveSplitsAreNotMerged}. 136 * This class overrides {@link TableInputFormatBase#includeRegionInSplit} to exclude specific 137 * splits. 138 */ 139 private static class TableInputFormatForMergeTesting extends TableInputFormat { 140 private byte[] prefixStartKey = Bytes.toBytes("b"); 141 private byte[] prefixEndKey = Bytes.toBytes("c"); 142 private RegionSizeCalculator sizeCalculator; 143 144 /** 145 * Exclude regions which contain rows starting with "b". 146 */ 147 @Override 148 protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { 149 if ( 150 Bytes.compareTo(startKey, prefixEndKey) < 0 && (Bytes.compareTo(prefixStartKey, endKey) < 0 151 || Bytes.equals(endKey, HConstants.EMPTY_END_ROW)) 152 ) { 153 return false; 154 } else { 155 return true; 156 } 157 } 158 159 @Override 160 protected void initializeTable(Connection connection, TableName tableName) throws IOException { 161 super.initializeTable(connection, tableName); 162 ConnectionForMergeTesting cft = (ConnectionForMergeTesting) connection; 163 sizeCalculator = cft.getRegionSizeCalculator(); 164 } 165 166 @Override 167 protected RegionSizeCalculator createRegionSizeCalculator(RegionLocator locator, Admin admin) 168 throws IOException { 169 return sizeCalculator; 170 } 171 } 172 173 /** 174 * Connection class to use in {@link #testNonSuccessiveSplitsAreNotMerged}. This class returns 175 * mocked {@link Table}, {@link RegionLocator}, {@link RegionSizeCalculator}, and {@link Admin}. 176 */ 177 private static class ConnectionForMergeTesting implements Connection { 178 public static final byte[][] SPLITS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b"), 179 Bytes.toBytes("c"), Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"), 180 Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"), 181 Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"), Bytes.toBytes("n"), 182 Bytes.toBytes("o"), Bytes.toBytes("p"), Bytes.toBytes("q"), Bytes.toBytes("r"), 183 Bytes.toBytes("s"), Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"), 184 Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"), Bytes.toBytes("z") }; 185 186 public static final byte[][] START_KEYS; 187 public static final byte[][] END_KEYS; 188 static { 189 START_KEYS = new byte[SPLITS.length + 1][]; 190 START_KEYS[0] = HConstants.EMPTY_BYTE_ARRAY; 191 for (int i = 0; i < SPLITS.length; i++) { 192 START_KEYS[i + 1] = SPLITS[i]; 193 } 194 195 END_KEYS = new byte[SPLITS.length + 1][]; 196 for (int i = 0; i < SPLITS.length; i++) { 197 END_KEYS[i] = SPLITS[i]; 198 } 199 END_KEYS[SPLITS.length] = HConstants.EMPTY_BYTE_ARRAY; 200 } 201 202 public static final Map<byte[], Long> SIZE_MAP = new TreeMap<>(Bytes.BYTES_COMPARATOR); 203 static { 204 for (byte[] startKey : START_KEYS) { 205 SIZE_MAP.put(startKey, 1024L * 1024L * 1024L); 206 } 207 SIZE_MAP.put(Bytes.toBytes("a"), 200L * 1024L * 1024L); 208 SIZE_MAP.put(Bytes.toBytes("b"), 200L * 1024L * 1024L); 209 SIZE_MAP.put(Bytes.toBytes("c"), 200L * 1024L * 1024L); 210 SIZE_MAP.put(Bytes.toBytes("o"), 200L * 1024L * 1024L); 211 SIZE_MAP.put(Bytes.toBytes("p"), 200L * 1024L * 1024L); 212 } 213 214 ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User user, 215 Map<String, byte[]> connectionAttributes) throws IOException { 216 } 217 218 @Override 219 public void abort(String why, Throwable e) { 220 } 221 222 @Override 223 public boolean isAborted() { 224 return false; 225 } 226 227 @Override 228 public Configuration getConfiguration() { 229 throw new UnsupportedOperationException(); 230 } 231 232 @Override 233 public Table getTable(TableName tableName) throws IOException { 234 Table table = mock(Table.class); 235 when(table.getName()).thenReturn(tableName); 236 return table; 237 } 238 239 @Override 240 public Table getTable(TableName tableName, ExecutorService pool) throws IOException { 241 throw new UnsupportedOperationException(); 242 } 243 244 @Override 245 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 246 throw new UnsupportedOperationException(); 247 } 248 249 @Override 250 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 251 throw new UnsupportedOperationException(); 252 } 253 254 @Override 255 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 256 final Map<byte[], HRegionLocation> locationMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 257 for (byte[] startKey : START_KEYS) { 258 HRegionLocation hrl = 259 new HRegionLocation(RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).build(), 260 ServerName.valueOf("localhost", 0, 0)); 261 locationMap.put(startKey, hrl); 262 } 263 264 RegionLocator locator = mock(RegionLocator.class); 265 when(locator.getRegionLocation(any(byte[].class), anyBoolean())) 266 .thenAnswer(new Answer<HRegionLocation>() { 267 @Override 268 public HRegionLocation answer(InvocationOnMock invocationOnMock) throws Throwable { 269 Object[] args = invocationOnMock.getArguments(); 270 byte[] key = (byte[]) args[0]; 271 return locationMap.get(key); 272 } 273 }); 274 when(locator.getStartEndKeys()) 275 .thenReturn(new Pair<byte[][], byte[][]>(START_KEYS, END_KEYS)); 276 return locator; 277 } 278 279 public RegionSizeCalculator getRegionSizeCalculator() { 280 RegionSizeCalculator sizeCalculator = mock(RegionSizeCalculator.class); 281 when(sizeCalculator.getRegionSize(any(byte[].class))).thenAnswer(new Answer<Long>() { 282 @Override 283 public Long answer(InvocationOnMock invocationOnMock) throws Throwable { 284 Object[] args = invocationOnMock.getArguments(); 285 byte[] regionId = (byte[]) args[0]; 286 byte[] startKey = RegionInfo.getStartKey(regionId); 287 return SIZE_MAP.get(startKey); 288 } 289 }); 290 return sizeCalculator; 291 } 292 293 @Override 294 public Admin getAdmin() throws IOException { 295 Admin admin = mock(Admin.class); 296 // return non-null admin to pass null checks 297 return admin; 298 } 299 300 @Override 301 public void close() throws IOException { 302 } 303 304 @Override 305 public boolean isClosed() { 306 return false; 307 } 308 309 @Override 310 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 311 throw new UnsupportedOperationException(); 312 } 313 314 @Override 315 public void clearRegionLocationCache() { 316 } 317 318 @Override 319 public String getClusterId() { 320 return null; 321 } 322 } 323}