001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022 023import java.io.IOException; 024import java.util.Arrays; 025import java.util.List; 026import java.util.Optional; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Delete; 035import org.apache.hadoop.hbase.client.Mutation; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.RegionInfo; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.ResultScanner; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.client.TableDescriptor; 043import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 044import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.wal.WALEdit; 048import org.apache.hadoop.hbase.wal.WALKey; 049import org.junit.AfterClass; 050import org.junit.Before; 051import org.junit.BeforeClass; 052import org.junit.ClassRule; 053import org.junit.Rule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.junit.rules.TestName; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 061 062@Category(MediumTests.class) 063public class TestRegionObserverForAddingMutationsFromCoprocessors { 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestRegionObserverForAddingMutationsFromCoprocessors.class); 068 069 private static final Logger LOG = 070 LoggerFactory.getLogger(TestRegionObserverForAddingMutationsFromCoprocessors.class); 071 072 private static HBaseTestingUtil util; 073 private static final byte[] dummy = Bytes.toBytes("dummy"); 074 private static final byte[] row1 = Bytes.toBytes("r1"); 075 private static final byte[] row2 = Bytes.toBytes("r2"); 076 private static final byte[] row3 = Bytes.toBytes("r3"); 077 private static final byte[] test = Bytes.toBytes("test"); 078 079 @Rule 080 public TestName name = new TestName(); 081 private TableName tableName; 082 083 @BeforeClass 084 public static void setUpBeforeClass() throws Exception { 085 Configuration conf = HBaseConfiguration.create(); 086 conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, TestWALObserver.class.getName()); 087 util = new HBaseTestingUtil(conf); 088 util.startMiniCluster(); 089 } 090 091 @AfterClass 092 public static void tearDownAfterClass() throws Exception { 093 util.shutdownMiniCluster(); 094 } 095 096 @Before 097 public void setUp() throws Exception { 098 tableName = TableName.valueOf(name.getMethodName()); 099 } 100 101 private void createTable(String coprocessor) throws IOException { 102 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 103 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(dummy)) 104 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(test)).setCoprocessor(coprocessor).build(); 105 util.getAdmin().createTable(tableDescriptor); 106 } 107 108 /** 109 * Test various multiput operations. 110 */ 111 @Test 112 public void testMulti() throws Exception { 113 createTable(TestMultiMutationCoprocessor.class.getName()); 114 115 try (Table t = util.getConnection().getTable(tableName)) { 116 t.put(new Put(row1).addColumn(test, dummy, dummy)); 117 assertRowCount(t, 3); 118 } 119 } 120 121 /** 122 * Tests that added mutations from coprocessors end up in the WAL. 123 */ 124 @Test 125 public void testCPMutationsAreWrittenToWALEdit() throws Exception { 126 createTable(TestMultiMutationCoprocessor.class.getName()); 127 128 try (Table t = util.getConnection().getTable(tableName)) { 129 t.put(new Put(row1).addColumn(test, dummy, dummy)); 130 assertRowCount(t, 3); 131 } 132 133 assertNotNull(TestWALObserver.savedEdit); 134 assertEquals(4, TestWALObserver.savedEdit.getCells().size()); 135 } 136 137 private static void assertRowCount(Table t, int expected) throws IOException { 138 try (ResultScanner scanner = t.getScanner(new Scan())) { 139 int i = 0; 140 for (Result r : scanner) { 141 LOG.info(r.toString()); 142 i++; 143 } 144 assertEquals(expected, i); 145 } 146 } 147 148 @Test 149 public void testDeleteCell() throws Exception { 150 createTable(TestDeleteCellCoprocessor.class.getName()); 151 152 try (Table t = util.getConnection().getTable(tableName)) { 153 t.put(Lists.newArrayList(new Put(row1).addColumn(test, dummy, dummy), 154 new Put(row2).addColumn(test, dummy, dummy), new Put(row3).addColumn(test, dummy, dummy))); 155 156 assertRowCount(t, 3); 157 158 t.delete(new Delete(test).addColumn(test, dummy)); // delete non-existing row 159 assertRowCount(t, 1); 160 } 161 } 162 163 @Test 164 public void testDeleteFamily() throws Exception { 165 createTable(TestDeleteFamilyCoprocessor.class.getName()); 166 167 try (Table t = util.getConnection().getTable(tableName)) { 168 t.put(Lists.newArrayList(new Put(row1).addColumn(test, dummy, dummy), 169 new Put(row2).addColumn(test, dummy, dummy), new Put(row3).addColumn(test, dummy, dummy))); 170 171 assertRowCount(t, 3); 172 173 t.delete(new Delete(test).addFamily(test)); // delete non-existing row 174 assertRowCount(t, 1); 175 } 176 } 177 178 @Test 179 public void testDeleteRow() throws Exception { 180 createTable(TestDeleteRowCoprocessor.class.getName()); 181 182 try (Table t = util.getConnection().getTable(tableName)) { 183 t.put(Lists.newArrayList(new Put(row1).addColumn(test, dummy, dummy), 184 new Put(row2).addColumn(test, dummy, dummy), new Put(row3).addColumn(test, dummy, dummy))); 185 186 assertRowCount(t, 3); 187 188 t.delete(new Delete(test).addColumn(test, dummy)); // delete non-existing row 189 assertRowCount(t, 1); 190 } 191 } 192 193 @Test 194 public void testPutWithTTL() throws Exception { 195 createTable(TestPutWithTTLCoprocessor.class.getName()); 196 197 try (Table t = util.getConnection().getTable(tableName)) { 198 t.put(new Put(row1).addColumn(test, dummy, dummy).setTTL(3000)); 199 assertRowCount(t, 2); 200 // wait long enough for the TTL to expire 201 Thread.sleep(5000); 202 assertRowCount(t, 0); 203 } 204 } 205 206 public static class TestPutWithTTLCoprocessor implements RegionCoprocessor, RegionObserver { 207 @Override 208 public Optional<RegionObserver> getRegionObserver() { 209 return Optional.of(this); 210 } 211 212 @Override 213 public void preBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c, 214 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 215 Mutation mut = miniBatchOp.getOperation(0); 216 List<Cell> cells = mut.getFamilyCellMap().get(test); 217 Put[] puts = new Put[] { new Put(Bytes.toBytes("cpPut")) 218 .addColumn(test, dummy, cells.get(0).getTimestamp(), Bytes.toBytes("cpdummy")) 219 .setTTL(mut.getTTL()) }; 220 LOG.info("Putting:" + Arrays.toString(puts)); 221 miniBatchOp.addOperationsFromCP(0, puts); 222 } 223 } 224 225 public static class TestMultiMutationCoprocessor implements RegionCoprocessor, RegionObserver { 226 @Override 227 public Optional<RegionObserver> getRegionObserver() { 228 return Optional.of(this); 229 } 230 231 @Override 232 public void preBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c, 233 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 234 Mutation mut = miniBatchOp.getOperation(0); 235 List<Cell> cells = mut.getFamilyCellMap().get(test); 236 Put[] puts = new Put[] { 237 new Put(row1).addColumn(test, dummy, cells.get(0).getTimestamp(), Bytes.toBytes("cpdummy")), 238 new Put(row2).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy), 239 new Put(row3).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy), }; 240 LOG.info("Putting:" + Arrays.toString(puts)); 241 miniBatchOp.addOperationsFromCP(0, puts); 242 } 243 } 244 245 public static class TestDeleteCellCoprocessor implements RegionCoprocessor, RegionObserver { 246 @Override 247 public Optional<RegionObserver> getRegionObserver() { 248 return Optional.of(this); 249 } 250 251 @Override 252 public void preBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c, 253 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 254 Mutation mut = miniBatchOp.getOperation(0); 255 256 if (mut instanceof Delete) { 257 List<Cell> cells = mut.getFamilyCellMap().get(test); 258 Delete[] deletes = new Delete[] { 259 // delete only 2 rows 260 new Delete(row1).addColumns(test, dummy, cells.get(0).getTimestamp()), 261 new Delete(row2).addColumns(test, dummy, cells.get(0).getTimestamp()), }; 262 LOG.info("Deleting:" + Arrays.toString(deletes)); 263 miniBatchOp.addOperationsFromCP(0, deletes); 264 } 265 } 266 } 267 268 public static class TestDeleteFamilyCoprocessor implements RegionCoprocessor, RegionObserver { 269 @Override 270 public Optional<RegionObserver> getRegionObserver() { 271 return Optional.of(this); 272 } 273 274 @Override 275 public void preBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c, 276 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 277 Mutation mut = miniBatchOp.getOperation(0); 278 279 if (mut instanceof Delete) { 280 List<Cell> cells = mut.getFamilyCellMap().get(test); 281 Delete[] deletes = new Delete[] { 282 // delete only 2 rows 283 new Delete(row1).addFamily(test, cells.get(0).getTimestamp()), 284 new Delete(row2).addFamily(test, cells.get(0).getTimestamp()), }; 285 LOG.info("Deleting:" + Arrays.toString(deletes)); 286 miniBatchOp.addOperationsFromCP(0, deletes); 287 } 288 } 289 } 290 291 public static class TestDeleteRowCoprocessor implements RegionCoprocessor, RegionObserver { 292 @Override 293 public Optional<RegionObserver> getRegionObserver() { 294 return Optional.of(this); 295 } 296 297 @Override 298 public void preBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c, 299 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 300 Mutation mut = miniBatchOp.getOperation(0); 301 302 if (mut instanceof Delete) { 303 List<Cell> cells = mut.getFamilyCellMap().get(test); 304 Delete[] deletes = new Delete[] { 305 // delete only 2 rows 306 new Delete(row1, cells.get(0).getTimestamp()), 307 new Delete(row2, cells.get(0).getTimestamp()), }; 308 LOG.info("Deleting:" + Arrays.toString(deletes)); 309 miniBatchOp.addOperationsFromCP(0, deletes); 310 } 311 } 312 } 313 314 public static class TestWALObserver implements WALCoprocessor, WALObserver { 315 static WALEdit savedEdit = null; 316 317 @Override 318 public Optional<WALObserver> getWALObserver() { 319 return Optional.of(this); 320 } 321 322 @Override 323 public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx, 324 RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { 325 if (info.getTable().equals(TableName.valueOf("testCPMutationsAreWrittenToWALEdit"))) { 326 savedEdit = logEdit; 327 } 328 } 329 } 330}