001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static junit.framework.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Map; 026import java.util.Optional; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Admin; 035import org.apache.hadoop.hbase.client.Delete; 036import org.apache.hadoop.hbase.client.Durability; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 046import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 047import org.apache.hadoop.hbase.wal.WALEdit; 048import org.junit.AfterClass; 049import org.junit.Before; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054 055@Category({ CoprocessorTests.class, MediumTests.class }) 056public class TestRegionObserverBypass { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestRegionObserverBypass.class); 061 062 private static HBaseTestingUtility util; 063 private static final TableName tableName = TableName.valueOf("test"); 064 private static final byte[] dummy = Bytes.toBytes("dummy"); 065 private static final byte[] row1 = Bytes.toBytes("r1"); 066 private static final byte[] row2 = Bytes.toBytes("r2"); 067 private static final byte[] row3 = Bytes.toBytes("r3"); 068 private static final byte[] test = Bytes.toBytes("test"); 069 070 @BeforeClass 071 public static void setUpBeforeClass() throws Exception { 072 // Stack up three coprocessors just so I can check bypass skips subsequent calls. 073 Configuration conf = HBaseConfiguration.create(); 074 conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 075 new String[] { TestCoprocessor.class.getName(), TestCoprocessor2.class.getName(), 076 TestCoprocessor3.class.getName() }); 077 util = new HBaseTestingUtility(conf); 078 util.startMiniCluster(); 079 } 080 081 @AfterClass 082 public static void tearDownAfterClass() throws Exception { 083 util.shutdownMiniCluster(); 084 } 085 086 @Before 087 public void setUp() throws Exception { 088 Admin admin = util.getAdmin(); 089 if (admin.tableExists(tableName)) { 090 if (admin.isTableEnabled(tableName)) { 091 admin.disableTable(tableName); 092 } 093 admin.deleteTable(tableName); 094 } 095 util.createTable(tableName, new byte[][] { dummy, test }); 096 TestCoprocessor.PREPUT_BYPASSES.set(0); 097 TestCoprocessor.PREPUT_INVOCATIONS.set(0); 098 } 099 100 /** 101 * do a single put that is bypassed by a RegionObserver 102 */ 103 @Test 104 public void testSimple() throws Exception { 105 Table t = util.getConnection().getTable(tableName); 106 Put p = new Put(row1); 107 p.addColumn(test, dummy, dummy); 108 // before HBASE-4331, this would throw an exception 109 t.put(p); 110 checkRowAndDelete(t, row1, 0); 111 t.close(); 112 } 113 114 /** 115 * Test various multiput operations. If the column family is 'test', then bypass is invoked. 116 */ 117 @Test 118 public void testMulti() throws Exception { 119 // ensure that server time increments every time we do an operation, otherwise 120 // previous deletes will eclipse successive puts having the same timestamp 121 EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge()); 122 123 Table t = util.getConnection().getTable(tableName); 124 List<Put> puts = new ArrayList<>(); 125 Put p = new Put(row1); 126 p.addColumn(dummy, dummy, dummy); 127 puts.add(p); 128 p = new Put(row2); 129 p.addColumn(test, dummy, dummy); 130 puts.add(p); 131 p = new Put(row3); 132 p.addColumn(test, dummy, dummy); 133 puts.add(p); 134 // before HBASE-4331, this would throw an exception 135 t.put(puts); 136 checkRowAndDelete(t, row1, 1); 137 checkRowAndDelete(t, row2, 0); 138 checkRowAndDelete(t, row3, 0); 139 140 puts.clear(); 141 p = new Put(row1); 142 p.addColumn(test, dummy, dummy); 143 puts.add(p); 144 p = new Put(row2); 145 p.addColumn(test, dummy, dummy); 146 puts.add(p); 147 p = new Put(row3); 148 p.addColumn(test, dummy, dummy); 149 puts.add(p); 150 // before HBASE-4331, this would throw an exception 151 t.put(puts); 152 checkRowAndDelete(t, row1, 0); 153 checkRowAndDelete(t, row2, 0); 154 checkRowAndDelete(t, row3, 0); 155 156 puts.clear(); 157 p = new Put(row1); 158 p.addColumn(test, dummy, dummy); 159 puts.add(p); 160 p = new Put(row2); 161 p.addColumn(test, dummy, dummy); 162 puts.add(p); 163 p = new Put(row3); 164 p.addColumn(dummy, dummy, dummy); 165 puts.add(p); 166 // this worked fine even before HBASE-4331 167 t.put(puts); 168 checkRowAndDelete(t, row1, 0); 169 checkRowAndDelete(t, row2, 0); 170 checkRowAndDelete(t, row3, 1); 171 172 puts.clear(); 173 p = new Put(row1); 174 p.addColumn(dummy, dummy, dummy); 175 puts.add(p); 176 p = new Put(row2); 177 p.addColumn(test, dummy, dummy); 178 puts.add(p); 179 p = new Put(row3); 180 p.addColumn(dummy, dummy, dummy); 181 puts.add(p); 182 // this worked fine even before HBASE-4331 183 t.put(puts); 184 checkRowAndDelete(t, row1, 1); 185 checkRowAndDelete(t, row2, 0); 186 checkRowAndDelete(t, row3, 1); 187 188 puts.clear(); 189 p = new Put(row1); 190 p.addColumn(test, dummy, dummy); 191 puts.add(p); 192 p = new Put(row2); 193 p.addColumn(dummy, dummy, dummy); 194 puts.add(p); 195 p = new Put(row3); 196 p.addColumn(test, dummy, dummy); 197 puts.add(p); 198 // before HBASE-4331, this would throw an exception 199 t.put(puts); 200 checkRowAndDelete(t, row1, 0); 201 checkRowAndDelete(t, row2, 1); 202 checkRowAndDelete(t, row3, 0); 203 t.close(); 204 205 EnvironmentEdgeManager.reset(); 206 } 207 208 private void checkRowAndDelete(Table t, byte[] row, int count) throws IOException { 209 Get g = new Get(row); 210 Result r = t.get(g); 211 assertEquals(count, r.size()); 212 Delete d = new Delete(row); 213 t.delete(d); 214 } 215 216 /** 217 * Test that when bypass is called, we skip out calling any other coprocessors stacked up method, 218 * in this case, a prePut. If the column family is 'test', then bypass is invoked. 219 */ 220 @Test 221 public void testBypassAlsoCompletes() throws IOException { 222 // ensure that server time increments every time we do an operation, otherwise 223 // previous deletes will eclipse successive puts having the same timestamp 224 EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge()); 225 226 Table t = util.getConnection().getTable(tableName); 227 List<Put> puts = new ArrayList<>(); 228 Put p = new Put(row1); 229 p.addColumn(dummy, dummy, dummy); 230 puts.add(p); 231 p = new Put(row2); 232 p.addColumn(test, dummy, dummy); 233 puts.add(p); 234 p = new Put(row3); 235 p.addColumn(test, dummy, dummy); 236 puts.add(p); 237 t.put(puts); 238 // Ensure expected result. 239 checkRowAndDelete(t, row1, 1); 240 checkRowAndDelete(t, row2, 0); 241 checkRowAndDelete(t, row3, 0); 242 // We have three Coprocessors stacked up on the prePut. See the beforeClass setup. We did three 243 // puts above two of which bypassed. A bypass means do not call the other coprocessors in the 244 // stack so for the two 'test' calls in the above, we should not have call through to all all 245 // three coprocessors in the chain. So we should have: 246 // 3 invocations for first put + 1 invocation + 1 bypass for second put + 1 invocation + 247 // 1 bypass for the last put. Assert. 248 assertEquals("Total CP invocation count", 5, TestCoprocessor.PREPUT_INVOCATIONS.get()); 249 assertEquals("Total CP bypasses", 2, TestCoprocessor.PREPUT_BYPASSES.get()); 250 } 251 252 public static class TestCoprocessor implements RegionCoprocessor, RegionObserver { 253 static AtomicInteger PREPUT_INVOCATIONS = new AtomicInteger(0); 254 static AtomicInteger PREPUT_BYPASSES = new AtomicInteger(0); 255 256 @Override 257 public Optional<RegionObserver> getRegionObserver() { 258 return Optional.of(this); 259 } 260 261 @Override 262 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, 263 final WALEdit edit, final Durability durability) throws IOException { 264 PREPUT_INVOCATIONS.incrementAndGet(); 265 Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap(); 266 if (familyMap.containsKey(test)) { 267 PREPUT_BYPASSES.incrementAndGet(); 268 e.bypass(); 269 } 270 } 271 } 272 273 /** 274 * Calls through to TestCoprocessor. 275 */ 276 public static class TestCoprocessor2 extends TestRegionObserverBypass.TestCoprocessor { 277 } 278 279 /** 280 * Calls through to TestCoprocessor. 281 */ 282 public static class TestCoprocessor3 extends TestRegionObserverBypass.TestCoprocessor { 283 } 284}