001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.List; 027import java.util.Optional; 028import org.apache.hadoop.conf.Configurable; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FSDataOutputStream; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.client.Durability; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.ResultScanner; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.coprocessor.ObserverContext; 048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 049import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 050import org.apache.hadoop.hbase.coprocessor.RegionObserver; 051import org.apache.hadoop.hbase.regionserver.Region; 052import org.apache.hadoop.hbase.testclassification.LargeTests; 053import org.apache.hadoop.hbase.testclassification.MapReduceTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.wal.WALEdit; 056import org.apache.hadoop.util.Tool; 057import org.apache.hadoop.util.ToolRunner; 058import org.junit.AfterClass; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Rule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064import org.junit.rules.TestName; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068@Category({ MapReduceTests.class, LargeTests.class }) 069public class TestImportTSVWithOperationAttributes implements Configurable { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestImportTSVWithOperationAttributes.class); 074 075 private static final Logger LOG = 076 LoggerFactory.getLogger(TestImportTSVWithOperationAttributes.class); 077 protected static final String NAME = TestImportTsv.class.getSimpleName(); 078 protected static HBaseTestingUtil util = new HBaseTestingUtil(); 079 080 /** 081 * Delete the tmp directory after running doMROnTableTest. Boolean. Default is false. 082 */ 083 protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; 084 085 /** 086 * Force use of combiner in doMROnTableTest. Boolean. Default is true. 087 */ 088 protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; 089 090 private static Configuration conf; 091 092 private static final String TEST_ATR_KEY = "test"; 093 094 private final String FAMILY = "FAM"; 095 096 @Rule 097 public TestName name = new TestName(); 098 099 @Override 100 public Configuration getConf() { 101 return util.getConfiguration(); 102 } 103 104 @Override 105 public void setConf(Configuration conf) { 106 throw new IllegalArgumentException("setConf not supported"); 107 } 108 109 @BeforeClass 110 public static void provisionCluster() throws Exception { 111 conf = util.getConfiguration(); 112 conf.set("hbase.coprocessor.master.classes", OperationAttributesTestController.class.getName()); 113 conf.set("hbase.coprocessor.region.classes", OperationAttributesTestController.class.getName()); 114 util.startMiniCluster(); 115 } 116 117 @AfterClass 118 public static void releaseCluster() throws Exception { 119 util.shutdownMiniCluster(); 120 } 121 122 @Test 123 public void testMROnTable() throws Exception { 124 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 125 126 // Prepare the arguments required for the test. 127 String[] args = new String[] { 128 "-D" + ImportTsv.MAPPER_CONF_KEY 129 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr", 130 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY", 131 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 132 String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest=>myvalue\n"; 133 util.createTable(tableName, FAMILY); 134 doMROnTableTest(util, FAMILY, data, args, 1, true); 135 util.deleteTable(tableName); 136 } 137 138 @Test 139 public void testMROnTableWithInvalidOperationAttr() throws Exception { 140 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 141 142 // Prepare the arguments required for the test. 143 String[] args = new String[] { 144 "-D" + ImportTsv.MAPPER_CONF_KEY 145 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr", 146 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY", 147 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 148 String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest1=>myvalue\n"; 149 util.createTable(tableName, FAMILY); 150 doMROnTableTest(util, FAMILY, data, args, 1, false); 151 util.deleteTable(tableName); 152 } 153 154 /** 155 * Run an ImportTsv job and perform basic validation on the results. Returns the ImportTsv 156 * <code>Tool</code> instance so that other tests can inspect it for further validation as 157 * necessary. This method is static to insure non-reliance on instance's util/conf facilities. Any 158 * arguments to pass BEFORE inputFile path is appended. 159 * @return The Tool instance used to run the test. 160 */ 161 private Tool doMROnTableTest(HBaseTestingUtil util, String family, String data, String[] args, 162 int valueMultiplier, boolean dataAvailable) throws Exception { 163 String table = args[args.length - 1]; 164 Configuration conf = new Configuration(util.getConfiguration()); 165 166 // populate input file 167 FileSystem fs = FileSystem.get(conf); 168 Path inputPath = fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table), "input.dat")); 169 FSDataOutputStream op = fs.create(inputPath, true); 170 op.write(Bytes.toBytes(data)); 171 op.close(); 172 LOG.debug(String.format("Wrote test data to file: %s", inputPath)); 173 174 if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { 175 LOG.debug("Forcing combiner."); 176 conf.setInt("mapreduce.map.combine.minspills", 1); 177 } 178 179 // run the import 180 List<String> argv = new ArrayList<>(Arrays.asList(args)); 181 argv.add(inputPath.toString()); 182 Tool tool = new ImportTsv(); 183 LOG.debug("Running ImportTsv with arguments: " + argv); 184 assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); 185 186 validateTable(conf, TableName.valueOf(table), family, valueMultiplier, dataAvailable); 187 188 if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { 189 LOG.debug("Deleting test subdirectory"); 190 util.cleanupDataTestDirOnTestFS(table); 191 } 192 return tool; 193 } 194 195 /** 196 * Confirm ImportTsv via data in online table. 197 */ 198 private static void validateTable(Configuration conf, TableName tableName, String family, 199 int valueMultiplier, boolean dataAvailable) throws IOException { 200 201 LOG.debug("Validating table."); 202 Connection connection = ConnectionFactory.createConnection(conf); 203 Table table = connection.getTable(tableName); 204 boolean verified = false; 205 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 206 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 207 for (int i = 0; i < numRetries; i++) { 208 try { 209 Scan scan = new Scan(); 210 // Scan entire family. 211 scan.addFamily(Bytes.toBytes(family)); 212 if (dataAvailable) { 213 ResultScanner resScanner = table.getScanner(scan); 214 for (Result res : resScanner) { 215 LOG.debug("Getting results " + res.size()); 216 assertTrue(res.size() == 2); 217 List<Cell> kvs = res.listCells(); 218 assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY"))); 219 assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY"))); 220 assertTrue( 221 CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); 222 assertTrue( 223 CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier))); 224 // Only one result set is expected, so let it loop. 225 verified = true; 226 } 227 } else { 228 ResultScanner resScanner = table.getScanner(scan); 229 Result[] next = resScanner.next(2); 230 assertEquals(0, next.length); 231 verified = true; 232 } 233 234 break; 235 } catch (NullPointerException e) { 236 // If here, a cell was empty. Presume its because updates came in 237 // after the scanner had been opened. Wait a while and retry. 238 } 239 try { 240 Thread.sleep(pause); 241 } catch (InterruptedException e) { 242 // continue 243 } 244 } 245 table.close(); 246 connection.close(); 247 assertTrue(verified); 248 } 249 250 public static class OperationAttributesTestController 251 implements RegionCoprocessor, RegionObserver { 252 253 @Override 254 public Optional<RegionObserver> getRegionObserver() { 255 return Optional.of(this); 256 } 257 258 @Override 259 public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> e, Put put, 260 WALEdit edit, Durability durability) throws IOException { 261 Region region = e.getEnvironment().getRegion(); 262 if ( 263 !region.getRegionInfo().isMetaRegion() && !region.getRegionInfo().getTable().isSystemTable() 264 ) { 265 if (put.getAttribute(TEST_ATR_KEY) != null) { 266 LOG.debug("allow any put to happen " + region.getRegionInfo().getRegionNameAsString()); 267 } else { 268 e.bypass(); 269 } 270 } 271 } 272 } 273}