001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.List; 026import java.util.Optional; 027import org.apache.hadoop.conf.Configurable; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FSDataOutputStream; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Durability; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.coprocessor.ObserverContext; 038import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 039import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 040import org.apache.hadoop.hbase.coprocessor.RegionObserver; 041import org.apache.hadoop.hbase.regionserver.Region; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.testclassification.MapReduceTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.wal.WALEdit; 046import org.apache.hadoop.util.Tool; 047import org.apache.hadoop.util.ToolRunner; 048import org.junit.AfterClass; 049import org.junit.BeforeClass; 050import org.junit.ClassRule; 051import org.junit.Rule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.junit.rules.TestName; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058@Category({ MapReduceTests.class, LargeTests.class }) 059public class TestImportTSVWithTTLs implements Configurable { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestImportTSVWithTTLs.class); 064 065 protected static final Logger LOG = LoggerFactory.getLogger(TestImportTSVWithTTLs.class); 066 protected static final String NAME = TestImportTsv.class.getSimpleName(); 067 protected static HBaseTestingUtil util = new HBaseTestingUtil(); 068 069 /** 070 * Delete the tmp directory after running doMROnTableTest. Boolean. Default is false. 071 */ 072 protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; 073 074 /** 075 * Force use of combiner in doMROnTableTest. Boolean. Default is true. 076 */ 077 protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; 078 079 private final String FAMILY = "FAM"; 080 private static Configuration conf; 081 082 @Rule 083 public TestName name = new TestName(); 084 085 @Override 086 public Configuration getConf() { 087 return util.getConfiguration(); 088 } 089 090 @Override 091 public void setConf(Configuration conf) { 092 throw new IllegalArgumentException("setConf not supported"); 093 } 094 095 @BeforeClass 096 public static void provisionCluster() throws Exception { 097 conf = util.getConfiguration(); 098 // We don't check persistence in HFiles in this test, but if we ever do we will 099 // need this where the default hfile version is not 3 (i.e. 0.98) 100 conf.setInt("hfile.format.version", 3); 101 conf.set("hbase.coprocessor.region.classes", TTLCheckingObserver.class.getName()); 102 util.startMiniCluster(); 103 } 104 105 @AfterClass 106 public static void releaseCluster() throws Exception { 107 util.shutdownMiniCluster(); 108 } 109 110 @Test 111 public void testMROnTable() throws Exception { 112 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 113 114 // Prepare the arguments required for the test. 115 String[] args = new String[] { 116 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", 117 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL", 118 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 119 String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n"; 120 util.createTable(tableName, FAMILY); 121 doMROnTableTest(util, FAMILY, data, args, 1); 122 util.deleteTable(tableName); 123 } 124 125 protected static Tool doMROnTableTest(HBaseTestingUtil util, String family, String data, 126 String[] args, int valueMultiplier) throws Exception { 127 TableName table = TableName.valueOf(args[args.length - 1]); 128 Configuration conf = new Configuration(util.getConfiguration()); 129 130 // populate input file 131 FileSystem fs = FileSystem.get(conf); 132 Path inputPath = 133 fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); 134 FSDataOutputStream op = fs.create(inputPath, true); 135 op.write(Bytes.toBytes(data)); 136 op.close(); 137 LOG.debug(String.format("Wrote test data to file: %s", inputPath)); 138 139 if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { 140 LOG.debug("Forcing combiner."); 141 conf.setInt("mapreduce.map.combine.minspills", 1); 142 } 143 144 // run the import 145 List<String> argv = new ArrayList<>(Arrays.asList(args)); 146 argv.add(inputPath.toString()); 147 Tool tool = new ImportTsv(); 148 LOG.debug("Running ImportTsv with arguments: " + argv); 149 try { 150 // Job will fail if observer rejects entries without TTL 151 assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); 152 } finally { 153 // Clean up 154 if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { 155 LOG.debug("Deleting test subdirectory"); 156 util.cleanupDataTestDirOnTestFS(table.getNameAsString()); 157 } 158 } 159 160 return tool; 161 } 162 163 public static class TTLCheckingObserver implements RegionCoprocessor, RegionObserver { 164 165 @Override 166 public Optional<RegionObserver> getRegionObserver() { 167 return Optional.of(this); 168 } 169 170 @Override 171 public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> e, Put put, 172 WALEdit edit, Durability durability) throws IOException { 173 Region region = e.getEnvironment().getRegion(); 174 if ( 175 !region.getRegionInfo().isMetaRegion() && !region.getRegionInfo().getTable().isSystemTable() 176 ) { 177 // The put carries the TTL attribute 178 if (put.getTTL() != Long.MAX_VALUE) { 179 return; 180 } 181 throw new IOException("Operation does not have TTL set"); 182 } 183 } 184 } 185}