001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.mockito.ArgumentMatchers.any; 022import static org.mockito.ArgumentMatchers.anyBoolean; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.when; 025 026import java.io.IOException; 027import java.net.Inet6Address; 028import java.net.InetAddress; 029import java.net.UnknownHostException; 030import java.util.Map; 031import java.util.TreeMap; 032import java.util.concurrent.ExecutorService; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.HRegionLocation; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.AsyncConnection; 042import org.apache.hadoop.hbase.client.BufferedMutator; 043import org.apache.hadoop.hbase.client.BufferedMutatorParams; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.ConnectionRegistry; 046import org.apache.hadoop.hbase.client.ConnectionUtils; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.RegionInfoBuilder; 049import org.apache.hadoop.hbase.client.RegionLocator; 050import org.apache.hadoop.hbase.client.Table; 051import org.apache.hadoop.hbase.client.TableBuilder; 052import org.apache.hadoop.hbase.security.User; 053import org.apache.hadoop.hbase.testclassification.SmallTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.Pair; 056import org.apache.hadoop.mapreduce.JobContext; 057import org.junit.ClassRule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.mockito.Mockito; 061import org.mockito.invocation.InvocationOnMock; 062import org.mockito.stubbing.Answer; 063 064@Category({ SmallTests.class }) 065public class TestTableInputFormatBase { 066 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestTableInputFormatBase.class); 070 071 @Test 072 public void testReuseRegionSizeCalculator() throws IOException { 073 JobContext context = mock(JobContext.class); 074 Configuration conf = HBaseConfiguration.create(); 075 conf.set(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, 076 ConnectionForMergeTesting.class.getName()); 077 conf.set(TableInputFormat.INPUT_TABLE, "testTable"); 078 conf.setBoolean(TableInputFormatBase.MAPREDUCE_INPUT_AUTOBALANCE, true); 079 when(context.getConfiguration()).thenReturn(conf); 080 081 TableInputFormat format = Mockito.spy(new TableInputFormatForMergeTesting()); 082 format.setConf(conf); 083 // initialize so that table is set, otherwise cloneOnFinish 084 // will be true and each getSplits call will re-initialize everything 085 format.initialize(context); 086 format.getSplits(context); 087 format.getSplits(context); 088 089 // re-initialize which will cause the next getSplits call to create a new RegionSizeCalculator 090 format.initialize(context); 091 format.getSplits(context); 092 format.getSplits(context); 093 094 // should only be 2 despite calling getSplits 4 times 095 Mockito.verify(format, Mockito.times(2)).createRegionSizeCalculator(Mockito.any(), 096 Mockito.any()); 097 } 098 099 @Test 100 public void testTableInputFormatBaseReverseDNSForIPv6() throws UnknownHostException { 101 String address = "ipv6.google.com"; 102 String localhost = null; 103 InetAddress addr = null; 104 TableInputFormat inputFormat = new TableInputFormat(); 105 try { 106 localhost = InetAddress.getByName(address).getCanonicalHostName(); 107 addr = Inet6Address.getByName(address); 108 } catch (UnknownHostException e) { 109 // google.com is down, we can probably forgive this test. 110 return; 111 } 112 System.out.println("Should retrun the hostname for this host " + localhost + " addr : " + addr); 113 String actualHostName = inputFormat.reverseDNS(addr); 114 assertEquals("Should retrun the hostname for this host. Expected : " + localhost + " Actual : " 115 + actualHostName, localhost, actualHostName); 116 } 117 118 @Test 119 public void testNonSuccessiveSplitsAreNotMerged() throws IOException { 120 JobContext context = mock(JobContext.class); 121 Configuration conf = HBaseConfiguration.create(); 122 conf.set(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, 123 ConnectionForMergeTesting.class.getName()); 124 conf.set(TableInputFormat.INPUT_TABLE, "testTable"); 125 conf.setBoolean(TableInputFormatBase.MAPREDUCE_INPUT_AUTOBALANCE, true); 126 when(context.getConfiguration()).thenReturn(conf); 127 128 TableInputFormat tifExclude = new TableInputFormatForMergeTesting(); 129 tifExclude.setConf(conf); 130 // split["b", "c"] is excluded, split["o", "p"] and split["p", "q"] are merged, 131 // but split["a", "b"] and split["c", "d"] are not merged. 132 assertEquals(ConnectionForMergeTesting.START_KEYS.length - 1 - 1, 133 tifExclude.getSplits(context).size()); 134 } 135 136 /** 137 * Subclass of {@link TableInputFormat} to use in {@link #testNonSuccessiveSplitsAreNotMerged}. 138 * This class overrides {@link TableInputFormatBase#includeRegionInSplit} to exclude specific 139 * splits. 140 */ 141 private static class TableInputFormatForMergeTesting extends TableInputFormat { 142 private byte[] prefixStartKey = Bytes.toBytes("b"); 143 private byte[] prefixEndKey = Bytes.toBytes("c"); 144 private RegionSizeCalculator sizeCalculator; 145 146 /** 147 * Exclude regions which contain rows starting with "b". 148 */ 149 @Override 150 protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { 151 if ( 152 Bytes.compareTo(startKey, prefixEndKey) < 0 && (Bytes.compareTo(prefixStartKey, endKey) < 0 153 || Bytes.equals(endKey, HConstants.EMPTY_END_ROW)) 154 ) { 155 return false; 156 } else { 157 return true; 158 } 159 } 160 161 @Override 162 protected void initializeTable(Connection connection, TableName tableName) throws IOException { 163 super.initializeTable(connection, tableName); 164 ConnectionForMergeTesting cft = (ConnectionForMergeTesting) connection; 165 sizeCalculator = cft.getRegionSizeCalculator(); 166 } 167 168 @Override 169 protected RegionSizeCalculator createRegionSizeCalculator(RegionLocator locator, Admin admin) 170 throws IOException { 171 return sizeCalculator; 172 } 173 } 174 175 /** 176 * Connection class to use in {@link #testNonSuccessiveSplitsAreNotMerged}. This class returns 177 * mocked {@link Table}, {@link RegionLocator}, {@link RegionSizeCalculator}, and {@link Admin}. 178 */ 179 private static class ConnectionForMergeTesting implements Connection { 180 public static final byte[][] SPLITS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b"), 181 Bytes.toBytes("c"), Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"), 182 Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"), 183 Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"), Bytes.toBytes("n"), 184 Bytes.toBytes("o"), Bytes.toBytes("p"), Bytes.toBytes("q"), Bytes.toBytes("r"), 185 Bytes.toBytes("s"), Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"), 186 Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"), Bytes.toBytes("z") }; 187 188 public static final byte[][] START_KEYS; 189 public static final byte[][] END_KEYS; 190 static { 191 START_KEYS = new byte[SPLITS.length + 1][]; 192 START_KEYS[0] = HConstants.EMPTY_BYTE_ARRAY; 193 for (int i = 0; i < SPLITS.length; i++) { 194 START_KEYS[i + 1] = SPLITS[i]; 195 } 196 197 END_KEYS = new byte[SPLITS.length + 1][]; 198 for (int i = 0; i < SPLITS.length; i++) { 199 END_KEYS[i] = SPLITS[i]; 200 } 201 END_KEYS[SPLITS.length] = HConstants.EMPTY_BYTE_ARRAY; 202 } 203 204 public static final Map<byte[], Long> SIZE_MAP = new TreeMap<>(Bytes.BYTES_COMPARATOR); 205 static { 206 for (byte[] startKey : START_KEYS) { 207 SIZE_MAP.put(startKey, 1024L * 1024L * 1024L); 208 } 209 SIZE_MAP.put(Bytes.toBytes("a"), 200L * 1024L * 1024L); 210 SIZE_MAP.put(Bytes.toBytes("b"), 200L * 1024L * 1024L); 211 SIZE_MAP.put(Bytes.toBytes("c"), 200L * 1024L * 1024L); 212 SIZE_MAP.put(Bytes.toBytes("o"), 200L * 1024L * 1024L); 213 SIZE_MAP.put(Bytes.toBytes("p"), 200L * 1024L * 1024L); 214 } 215 216 ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User user, 217 ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException { 218 } 219 220 @Override 221 public void abort(String why, Throwable e) { 222 } 223 224 @Override 225 public boolean isAborted() { 226 return false; 227 } 228 229 @Override 230 public Configuration getConfiguration() { 231 throw new UnsupportedOperationException(); 232 } 233 234 @Override 235 public Table getTable(TableName tableName) throws IOException { 236 Table table = mock(Table.class); 237 when(table.getName()).thenReturn(tableName); 238 return table; 239 } 240 241 @Override 242 public Table getTable(TableName tableName, ExecutorService pool) throws IOException { 243 throw new UnsupportedOperationException(); 244 } 245 246 @Override 247 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 248 throw new UnsupportedOperationException(); 249 } 250 251 @Override 252 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 253 throw new UnsupportedOperationException(); 254 } 255 256 @Override 257 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 258 final Map<byte[], HRegionLocation> locationMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 259 for (byte[] startKey : START_KEYS) { 260 HRegionLocation hrl = 261 new HRegionLocation(RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).build(), 262 ServerName.valueOf("localhost", 0, 0)); 263 locationMap.put(startKey, hrl); 264 } 265 266 RegionLocator locator = mock(RegionLocator.class); 267 when(locator.getRegionLocation(any(byte[].class), anyBoolean())) 268 .thenAnswer(new Answer<HRegionLocation>() { 269 @Override 270 public HRegionLocation answer(InvocationOnMock invocationOnMock) throws Throwable { 271 Object[] args = invocationOnMock.getArguments(); 272 byte[] key = (byte[]) args[0]; 273 return locationMap.get(key); 274 } 275 }); 276 when(locator.getStartEndKeys()) 277 .thenReturn(new Pair<byte[][], byte[][]>(START_KEYS, END_KEYS)); 278 return locator; 279 } 280 281 public RegionSizeCalculator getRegionSizeCalculator() { 282 RegionSizeCalculator sizeCalculator = mock(RegionSizeCalculator.class); 283 when(sizeCalculator.getRegionSize(any(byte[].class))).thenAnswer(new Answer<Long>() { 284 @Override 285 public Long answer(InvocationOnMock invocationOnMock) throws Throwable { 286 Object[] args = invocationOnMock.getArguments(); 287 byte[] regionId = (byte[]) args[0]; 288 byte[] startKey = RegionInfo.getStartKey(regionId); 289 return SIZE_MAP.get(startKey); 290 } 291 }); 292 return sizeCalculator; 293 } 294 295 @Override 296 public Admin getAdmin() throws IOException { 297 Admin admin = mock(Admin.class); 298 // return non-null admin to pass null checks 299 return admin; 300 } 301 302 @Override 303 public void close() throws IOException { 304 } 305 306 @Override 307 public boolean isClosed() { 308 return false; 309 } 310 311 @Override 312 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 313 throw new UnsupportedOperationException(); 314 } 315 316 @Override 317 public void clearRegionLocationCache() { 318 } 319 320 @Override 321 public AsyncConnection toAsyncConnection() { 322 throw new UnsupportedOperationException(); 323 } 324 325 @Override 326 public String getClusterId() { 327 return null; 328 } 329 } 330}