001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.List; 027import java.util.Map; 028import java.util.NavigableMap; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Get; 034import org.apache.hadoop.hbase.client.Increment; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.Result; 037import org.apache.hadoop.hbase.client.Row; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.io.TimeRange; 040import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; 041import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 046import org.junit.After; 047import org.junit.AfterClass; 048import org.junit.Before; 049import org.junit.BeforeClass; 050import org.junit.ClassRule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053 054/** 055 * This test runs batch mutation with Increments which have custom TimeRange. Custom Observer 056 * records the TimeRange. We then verify that the recorded TimeRange has same bounds as the initial 057 * TimeRange. See HBASE-15698 058 */ 059@Category({ CoprocessorTests.class, MediumTests.class }) 060public class TestIncrementTimeRange { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestIncrementTimeRange.class); 065 066 private static final HBaseTestingUtil util = new HBaseTestingUtil(); 067 private static ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); 068 069 private static final TableName TEST_TABLE = TableName.valueOf("test"); 070 private static final byte[] TEST_FAMILY = Bytes.toBytes("f1"); 071 072 private static final byte[] ROW_A = Bytes.toBytes("aaa"); 073 private static final byte[] ROW_B = Bytes.toBytes("bbb"); 074 private static final byte[] ROW_C = Bytes.toBytes("ccc"); 075 076 private static final byte[] qualifierCol1 = Bytes.toBytes("col1"); 077 078 private static final byte[] bytes1 = Bytes.toBytes(1); 079 private static final byte[] bytes2 = Bytes.toBytes(2); 080 private static final byte[] bytes3 = Bytes.toBytes(3); 081 082 private Table hTableInterface; 083 private Table table; 084 085 @BeforeClass 086 public static void setupBeforeClass() throws Exception { 087 util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 088 MyObserver.class.getName()); 089 // Make general delay zero rather than default. Timing is off in this 090 // test that depends on an evironment edge that is manually moved forward. 091 util.getConfiguration().setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 0); 092 util.startMiniCluster(); 093 EnvironmentEdgeManager.injectEdge(mee); 094 } 095 096 @AfterClass 097 public static void tearDownAfterClass() throws Exception { 098 util.shutdownMiniCluster(); 099 } 100 101 @Before 102 public void before() throws Exception { 103 table = util.createTable(TEST_TABLE, TEST_FAMILY); 104 105 Put puta = new Put(ROW_A); 106 puta.addColumn(TEST_FAMILY, qualifierCol1, bytes1); 107 table.put(puta); 108 109 Put putb = new Put(ROW_B); 110 putb.addColumn(TEST_FAMILY, qualifierCol1, bytes2); 111 table.put(putb); 112 113 Put putc = new Put(ROW_C); 114 putc.addColumn(TEST_FAMILY, qualifierCol1, bytes3); 115 table.put(putc); 116 } 117 118 @After 119 public void after() throws Exception { 120 try { 121 if (table != null) { 122 table.close(); 123 } 124 } finally { 125 try { 126 util.deleteTable(TEST_TABLE); 127 } catch (IOException ioe) { 128 } 129 } 130 } 131 132 public static class MyObserver extends SimpleRegionObserver { 133 static TimeRange tr10 = null, tr2 = null; 134 135 @Override 136 public Result preIncrement(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 137 final Increment increment) throws IOException { 138 NavigableMap<byte[], List<Cell>> map = increment.getFamilyCellMap(); 139 for (Map.Entry<byte[], List<Cell>> entry : map.entrySet()) { 140 for (Cell cell : entry.getValue()) { 141 long incr = 142 Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); 143 if (incr == 10) { 144 tr10 = increment.getTimeRange(); 145 } else if (incr == 2 && !increment.getTimeRange().isAllTime()) { 146 tr2 = increment.getTimeRange(); 147 } 148 } 149 } 150 return super.preIncrement(e, increment); 151 } 152 } 153 154 @Test 155 public void testHTableInterfaceMethods() throws Exception { 156 hTableInterface = util.getConnection().getTable(TEST_TABLE); 157 checkHTableInterfaceMethods(); 158 } 159 160 private void checkHTableInterfaceMethods() throws Exception { 161 long time = EnvironmentEdgeManager.currentTime(); 162 mee.setValue(time); 163 hTableInterface.put(new Put(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, Bytes.toBytes(1L))); 164 checkRowValue(ROW_A, Bytes.toBytes(1L)); 165 166 time = EnvironmentEdgeManager.currentTime(); 167 mee.setValue(time); 168 TimeRange range10 = TimeRange.between(1, time + 10); 169 hTableInterface.increment(new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 10L) 170 .setTimeRange(range10.getMin(), range10.getMax())); 171 checkRowValue(ROW_A, Bytes.toBytes(11L)); 172 assertEquals(MyObserver.tr10.getMin(), range10.getMin()); 173 assertEquals(MyObserver.tr10.getMax(), range10.getMax()); 174 175 time = EnvironmentEdgeManager.currentTime(); 176 mee.setValue(time); 177 TimeRange range2 = TimeRange.between(1, time + 20); 178 List<Row> actions = Arrays.asList(new Row[] { 179 new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L).setTimeRange(range2.getMin(), 180 range2.getMax()), 181 new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L).setTimeRange(range2.getMin(), 182 range2.getMax()) }); 183 Object[] results3 = new Object[actions.size()]; 184 Object[] results1 = results3; 185 hTableInterface.batch(actions, results1); 186 assertEquals(MyObserver.tr2.getMin(), range2.getMin()); 187 assertEquals(MyObserver.tr2.getMax(), range2.getMax()); 188 for (Object r2 : results1) { 189 assertTrue(r2 instanceof Result); 190 } 191 checkRowValue(ROW_A, Bytes.toBytes(15L)); 192 193 hTableInterface.close(); 194 } 195 196 private void checkRowValue(byte[] row, byte[] expectedValue) throws IOException { 197 Get get = new Get(row).addColumn(TEST_FAMILY, qualifierCol1); 198 Result result = hTableInterface.get(get); 199 byte[] actualValue = result.getValue(TEST_FAMILY, qualifierCol1); 200 assertArrayEquals(expectedValue, actualValue); 201 } 202}