001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; 021import static org.mockito.Mockito.mock; 022 023import java.io.IOException; 024import java.util.Iterator; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseTestingUtil; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Result; 031import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 032import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase; 033import org.apache.hadoop.hbase.testclassification.LargeTests; 034import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.RegionSplitter; 037import org.apache.hadoop.io.NullWritable; 038import org.apache.hadoop.mapred.InputSplit; 039import org.apache.hadoop.mapred.JobClient; 040import org.apache.hadoop.mapred.JobConf; 041import org.apache.hadoop.mapred.MapReduceBase; 042import org.apache.hadoop.mapred.OutputCollector; 043import org.apache.hadoop.mapred.RecordReader; 044import org.apache.hadoop.mapred.Reducer; 045import org.apache.hadoop.mapred.Reporter; 046import org.apache.hadoop.mapred.RunningJob; 047import org.apache.hadoop.mapred.lib.NullOutputFormat; 048import org.junit.Assert; 049import org.junit.ClassRule; 050import org.junit.Ignore; 051import org.junit.Rule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.junit.rules.TestName; 055 056@Category({ VerySlowMapReduceTests.class, LargeTests.class }) 057public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase { 058 059 @ClassRule 060 public static final HBaseClassTestRule CLASS_RULE = 061 HBaseClassTestRule.forClass(TestTableSnapshotInputFormat.class); 062 063 private static final byte[] aaa = Bytes.toBytes("aaa"); 064 private static final byte[] after_zzz = Bytes.toBytes("zz{"); // 'z' + 1 => '{' 065 private static final String COLUMNS = 066 Bytes.toString(FAMILIES[0]) + " " + Bytes.toString(FAMILIES[1]); 067 068 @Rule 069 public TestName name = new TestName(); 070 071 @Override 072 protected byte[] getStartRow() { 073 return aaa; 074 } 075 076 @Override 077 protected byte[] getEndRow() { 078 return after_zzz; 079 } 080 081 static class TestTableSnapshotMapper extends MapReduceBase 082 implements TableMap<ImmutableBytesWritable, NullWritable> { 083 @Override 084 public void map(ImmutableBytesWritable key, Result value, 085 OutputCollector<ImmutableBytesWritable, NullWritable> collector, Reporter reporter) 086 throws IOException { 087 verifyRowFromMap(key, value); 088 collector.collect(key, NullWritable.get()); 089 } 090 } 091 092 public static class TestTableSnapshotReducer extends MapReduceBase 093 implements Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> { 094 HBaseTestingUtil.SeenRowTracker rowTracker = 095 new HBaseTestingUtil.SeenRowTracker(aaa, after_zzz); 096 097 @Override 098 public void reduce(ImmutableBytesWritable key, Iterator<NullWritable> values, 099 OutputCollector<NullWritable, NullWritable> collector, Reporter reporter) throws IOException { 100 rowTracker.addRow(key.get()); 101 } 102 103 @Override 104 public void close() { 105 rowTracker.validate(); 106 } 107 } 108 109 @Test 110 public void testInitTableSnapshotMapperJobConfig() throws Exception { 111 final TableName tableName = TableName.valueOf(name.getMethodName()); 112 String snapshotName = "foo"; 113 114 try { 115 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); 116 JobConf job = new JobConf(UTIL.getConfiguration()); 117 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 118 119 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, 120 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 121 tmpTableDir); 122 123 // TODO: would be better to examine directly the cache instance that results from this 124 // config. Currently this is not possible because BlockCache initialization is static. 125 Assert.assertEquals("Snapshot job should be configured for default LruBlockCache.", 126 HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT, 127 job.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01); 128 Assert.assertEquals("Snapshot job should not use BucketCache.", 0, 129 job.getFloat("hbase.bucketcache.size", -1), 0.01); 130 } finally { 131 UTIL.getAdmin().deleteSnapshot(snapshotName); 132 UTIL.deleteTable(tableName); 133 } 134 } 135 136 // TODO: mapred does not support limiting input range by startrow, endrow. 137 // Thus the following tests must override parameterverification. 138 139 @Test 140 @Override 141 public void testWithMockedMapReduceMultiRegion() throws Exception { 142 testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10, true); 143 // It does not matter whether true or false is given to setLocalityEnabledTo, 144 // because it is not read in testWithMockedMapReduce(). 145 } 146 147 @Test 148 @Override 149 public void testWithMapReduceMultiRegion() throws Exception { 150 testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 1, 10, false); 151 } 152 153 @Test 154 @Override 155 // run the MR job while HBase is offline 156 public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception { 157 testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 1, 10, true); 158 } 159 160 @Override 161 public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName, 162 String snapshotName, Path tmpTableDir) throws Exception { 163 JobConf job = new JobConf(UTIL.getConfiguration()); 164 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, TestTableSnapshotMapper.class, 165 ImmutableBytesWritable.class, NullWritable.class, job, false, tmpTableDir); 166 } 167 168 @Override 169 protected void testWithMockedMapReduce(HBaseTestingUtil util, String snapshotName, int numRegions, 170 int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo) throws Exception { 171 final TableName tableName = TableName.valueOf(name.getMethodName()); 172 try { 173 createTableAndSnapshot(util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); 174 175 JobConf job = new JobConf(util.getConfiguration()); 176 // setLocalityEnabledTo is ignored no matter what is specified, so as to test the case that 177 // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified 178 // and the default value is taken. 179 Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); 180 181 if (numSplitsPerRegion > 1) { 182 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, 183 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, 184 false, tmpTableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion); 185 } else { 186 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, 187 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, 188 false, tmpTableDir); 189 } 190 191 // mapred doesn't support start and end keys? o.O 192 verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow()); 193 194 } finally { 195 util.getAdmin().deleteSnapshot(snapshotName); 196 util.deleteTable(tableName); 197 } 198 } 199 200 private void verifyWithMockedMapReduce(JobConf job, int numRegions, int expectedNumSplits, 201 byte[] startRow, byte[] stopRow) throws IOException, InterruptedException { 202 TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); 203 InputSplit[] splits = tsif.getSplits(job, 0); 204 205 Assert.assertEquals(expectedNumSplits, splits.length); 206 207 HBaseTestingUtil.SeenRowTracker rowTracker = 208 new HBaseTestingUtil.SeenRowTracker(startRow, stopRow); 209 210 // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified, 211 // so the default value is taken. 212 boolean localityEnabled = SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; 213 214 for (int i = 0; i < splits.length; i++) { 215 // validate input split 216 InputSplit split = splits[i]; 217 Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit); 218 if (localityEnabled) { 219 // When localityEnabled is true, meant to verify split.getLocations() 220 // by the following statement: 221 // Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0); 222 // However, getLocations() of some splits could return an empty array (length is 0), 223 // so drop the verification on length. 224 // TODO: investigate how to verify split.getLocations() when localityEnabled is true 225 Assert.assertTrue(split.getLocations() != null); 226 } else { 227 Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0); 228 } 229 230 // validate record reader 231 OutputCollector collector = mock(OutputCollector.class); 232 Reporter reporter = mock(Reporter.class); 233 RecordReader<ImmutableBytesWritable, Result> rr = tsif.getRecordReader(split, job, reporter); 234 235 // validate we can read all the data back 236 ImmutableBytesWritable key = rr.createKey(); 237 Result value = rr.createValue(); 238 while (rr.next(key, value)) { 239 verifyRowFromMap(key, value); 240 rowTracker.addRow(key.copyBytes()); 241 } 242 243 rr.close(); 244 } 245 246 // validate all rows are seen 247 rowTracker.validate(); 248 } 249 250 @Override 251 protected void testWithMapReduceImpl(HBaseTestingUtil util, TableName tableName, 252 String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, 253 int expectedNumSplits, boolean shutdownCluster) throws Exception { 254 doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir, 255 numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster); 256 } 257 258 // this is also called by the IntegrationTestTableSnapshotInputFormat 259 public static void doTestWithMapReduce(HBaseTestingUtil util, TableName tableName, 260 String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions, 261 int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception { 262 263 // create the table and snapshot 264 createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions); 265 266 if (shutdownCluster) { 267 util.shutdownMiniHBaseCluster(); 268 } 269 270 try { 271 // create the job 272 JobConf jobConf = new JobConf(util.getConfiguration()); 273 274 jobConf.setJarByClass(util.getClass()); 275 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(jobConf, 276 TestTableSnapshotInputFormat.class); 277 278 if (numSplitsPerRegion > 1) { 279 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, 280 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, jobConf, 281 true, tableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion); 282 } else { 283 TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, 284 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, jobConf, 285 true, tableDir); 286 } 287 288 jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class); 289 jobConf.setNumReduceTasks(1); 290 jobConf.setOutputFormat(NullOutputFormat.class); 291 292 RunningJob job = JobClient.runJob(jobConf); 293 Assert.assertTrue(job.isSuccessful()); 294 } finally { 295 if (!shutdownCluster) { 296 util.getAdmin().deleteSnapshot(snapshotName); 297 util.deleteTable(tableName); 298 } 299 } 300 } 301 302 @Ignore // Ignored in mapred package because it keeps failing but allowed in mapreduce package. 303 @Test 304 public void testWithMapReduceMultipleMappersPerRegion() throws Exception { 305 testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false); 306 } 307}