001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static java.lang.String.format; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024 025import java.io.File; 026import java.io.IOException; 027import java.util.Arrays; 028import java.util.HashMap; 029import java.util.Iterator; 030import java.util.Map; 031import java.util.Set; 032import java.util.TreeSet; 033import java.util.UUID; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.conf.Configured; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellComparator; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.IntegrationTestingUtility; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.testclassification.IntegrationTests; 048import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 051import org.apache.hadoop.util.Tool; 052import org.apache.hadoop.util.ToolRunner; 053import org.junit.AfterClass; 054import org.junit.BeforeClass; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 063import org.apache.hbase.thirdparty.com.google.common.base.Strings; 064 065/** 066 * Validate ImportTsv + BulkLoadFiles on a distributed cluster. 067 */ 068@Category(IntegrationTests.class) 069public class IntegrationTestImportTsv extends Configured implements Tool { 070 071 private static final String NAME = IntegrationTestImportTsv.class.getSimpleName(); 072 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestImportTsv.class); 073 private static final String GENERATED_HFILE_FOLDER_PARAM_KEY = 074 "IntegrationTestImportTsv.generatedHFileFolder"; 075 076 protected static final String simple_tsv = "row1\t1\tc1\tc2\n" + "row2\t1\tc1\tc2\n" 077 + "row3\t1\tc1\tc2\n" + "row4\t1\tc1\tc2\n" + "row5\t1\tc1\tc2\n" + "row6\t1\tc1\tc2\n" 078 + "row7\t1\tc1\tc2\n" + "row8\t1\tc1\tc2\n" + "row9\t1\tc1\tc2\n" + "row10\t1\tc1\tc2\n"; 079 080 @Rule 081 public TestName name = new TestName(); 082 083 protected static final Set<KeyValue> simple_expected = 084 new TreeSet<KeyValue>(CellComparator.getInstance()) { 085 private static final long serialVersionUID = 1L; 086 { 087 byte[] family = Bytes.toBytes("d"); 088 for (String line : Splitter.on('\n').split(simple_tsv)) { 089 if (Strings.isNullOrEmpty(line)) { 090 continue; 091 } 092 String[] row = line.split("\t"); 093 byte[] key = Bytes.toBytes(row[0]); 094 long ts = Long.parseLong(row[1]); 095 byte[][] fields = { Bytes.toBytes(row[2]), Bytes.toBytes(row[3]) }; 096 add(new KeyValue(key, family, fields[0], ts, KeyValue.Type.Put, fields[0])); 097 add(new KeyValue(key, family, fields[1], ts, KeyValue.Type.Put, fields[1])); 098 } 099 } 100 }; 101 102 // this instance is initialized on first access when the test is run from 103 // JUnit/Maven or by main when run from the CLI. 104 protected static IntegrationTestingUtility util = null; 105 106 @Override 107 public Configuration getConf() { 108 return util.getConfiguration(); 109 } 110 111 @Override 112 public void setConf(Configuration conf) { 113 LOG.debug("Ignoring setConf call."); 114 } 115 116 @BeforeClass 117 public static void provisionCluster() throws Exception { 118 if (null == util) { 119 util = new IntegrationTestingUtility(); 120 } 121 util.initializeCluster(1); 122 if (!util.isDistributedCluster()) { 123 // also need MR when running without a real cluster 124 util.startMiniMapReduceCluster(); 125 } 126 } 127 128 @AfterClass 129 public static void releaseCluster() throws Exception { 130 util.restoreCluster(); 131 if (!util.isDistributedCluster()) { 132 util.shutdownMiniMapReduceCluster(); 133 } 134 util = null; 135 } 136 137 /** 138 * Verify the data described by <code>simple_tsv</code> matches <code>simple_expected</code>. 139 */ 140 protected void doLoadIncrementalHFiles(Path hfiles, TableName tableName) throws Exception { 141 142 String[] args = { hfiles.toString(), tableName.getNameAsString() }; 143 LOG.info(format("Running LoadIncrememntalHFiles with args: %s", Arrays.asList(args))); 144 assertEquals("Loading HFiles failed.", 0, 145 ToolRunner.run(new BulkLoadHFilesTool(getConf()), args)); 146 147 Table table = null; 148 Scan scan = new Scan(); 149 scan.setCacheBlocks(false); 150 scan.setCaching(1000); 151 152 try { 153 table = util.getConnection().getTable(tableName); 154 Iterator<Result> resultsIt = table.getScanner(scan).iterator(); 155 Iterator<KeyValue> expectedIt = simple_expected.iterator(); 156 while (resultsIt.hasNext() && expectedIt.hasNext()) { 157 Result r = resultsIt.next(); 158 for (Cell actual : r.rawCells()) { 159 assertTrue("Ran out of expected values prematurely!", expectedIt.hasNext()); 160 KeyValue expected = expectedIt.next(); 161 assertEquals("Scan produced surprising result", 0, 162 CellComparator.getInstance().compare(expected, actual)); 163 } 164 } 165 assertFalse("Did not consume all expected values.", expectedIt.hasNext()); 166 assertFalse("Did not consume all scan results.", resultsIt.hasNext()); 167 } finally { 168 if (null != table) table.close(); 169 } 170 } 171 172 /** 173 * Confirm the absence of the {@link TotalOrderPartitioner} partitions file. 174 */ 175 protected static void validateDeletedPartitionsFile(Configuration conf) throws IOException { 176 if (!conf.getBoolean(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, false)) return; 177 178 FileSystem fs = FileSystem.get(conf); 179 Path partitionsFile = new Path(TotalOrderPartitioner.getPartitionFile(conf)); 180 assertFalse("Failed to clean up partitions file.", fs.exists(partitionsFile)); 181 } 182 183 @Test 184 public void testGenerateAndLoad() throws Exception { 185 generateAndLoad(TableName.valueOf(name.getMethodName())); 186 } 187 188 void generateAndLoad(final TableName table) throws Exception { 189 LOG.info("Running test testGenerateAndLoad."); 190 String cf = "d"; 191 Path hfiles = initGeneratedHFilePath(table); 192 LOG.info("The folder where the HFiles will be generated: {}", hfiles.toString()); 193 194 Map<String, String> args = new HashMap<>(); 195 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 196 args.put(ImportTsv.COLUMNS_CONF_KEY, format("HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2", cf, cf)); 197 // configure the test harness to NOT delete the HFiles after they're 198 // generated. We need those for doLoadIncrementalHFiles 199 args.put(TestImportTsv.DELETE_AFTER_LOAD_CONF, "false"); 200 201 // run the job, complete the load. 202 util.createTable(table, new String[] { cf }); 203 Tool t = TestImportTsv.doMROnTableTest(util, table, cf, simple_tsv, args); 204 doLoadIncrementalHFiles(hfiles, table); 205 206 // validate post-conditions 207 validateDeletedPartitionsFile(t.getConf()); 208 209 // clean up after ourselves. 210 util.deleteTable(table); 211 util.cleanupDataTestDirOnTestFS(table.getNameAsString()); 212 LOG.info("testGenerateAndLoad completed successfully."); 213 } 214 215 @Override 216 public int run(String[] args) throws Exception { 217 if (args.length != 0) { 218 System.err.println(format("%s [genericOptions]", NAME)); 219 System.err.println(" Runs ImportTsv integration tests against a distributed cluster."); 220 System.err.println(); 221 System.err.println(" Use '-D" + GENERATED_HFILE_FOLDER_PARAM_KEY + "=<path>' to define a"); 222 System.err.println(" base folder for the generated HFiles. If HDFS Transparent Encryption"); 223 System.err.println(" is configured, then make sure to set this parameter to a folder in"); 224 System.err.println(" the same encryption zone in HDFS as the HBase root directory,"); 225 System.err.println(" otherwise the bulkload will fail."); 226 System.err.println(); 227 ToolRunner.printGenericCommandUsage(System.err); 228 return 1; 229 } 230 231 // adding more test methods? Don't forget to add them here... or consider doing what 232 // IntegrationTestsDriver does. 233 provisionCluster(); 234 TableName tableName = TableName.valueOf("IntegrationTestImportTsv"); 235 if (util.getAdmin().tableExists(tableName)) { 236 util.deleteTable(tableName); 237 } 238 generateAndLoad(tableName); 239 releaseCluster(); 240 241 return 0; 242 } 243 244 private Path initGeneratedHFilePath(final TableName table) throws IOException { 245 String folderParam = getConf().getTrimmed(GENERATED_HFILE_FOLDER_PARAM_KEY); 246 if (folderParam == null || folderParam.isEmpty()) { 247 // by default, fall back to the test data dir 248 return new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "hfiles"); 249 } 250 251 Path hfiles = new Path(folderParam, UUID.randomUUID().toString()); 252 FileSystem fs = util.getTestFileSystem(); 253 String shouldPreserve = System.getProperty("hbase.testing.preserve.testdir", "false"); 254 if (!Boolean.parseBoolean(shouldPreserve)) { 255 if (fs.getUri().getScheme().equals(FileSystem.getLocal(getConf()).getUri().getScheme())) { 256 File localFoler = new File(hfiles.toString()); 257 localFoler.deleteOnExit(); 258 } else { 259 fs.deleteOnExit(hfiles); 260 } 261 } 262 return hfiles; 263 } 264 265 public static void main(String[] args) throws Exception { 266 Configuration conf = HBaseConfiguration.create(); 267 IntegrationTestingUtility.setUseDistributedCluster(conf); 268 util = new IntegrationTestingUtility(conf); 269 int status = ToolRunner.run(conf, new IntegrationTestImportTsv(), args); 270 System.exit(status); 271 } 272}