001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022 023import java.io.IOException; 024import java.util.Arrays; 025import java.util.List; 026import java.util.Optional; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HColumnDescriptor; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Delete; 036import org.apache.hadoop.hbase.client.Mutation; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.ResultScanner; 041import org.apache.hadoop.hbase.client.Scan; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.wal.WALEdit; 047import org.apache.hadoop.hbase.wal.WALKey; 048import org.junit.AfterClass; 049import org.junit.Before; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Rule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.junit.rules.TestName; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 060 061@Category(MediumTests.class) 062public class TestRegionObserverForAddingMutationsFromCoprocessors { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestRegionObserverForAddingMutationsFromCoprocessors.class); 067 068 private static final Logger LOG = 069 LoggerFactory.getLogger(TestRegionObserverForAddingMutationsFromCoprocessors.class); 070 071 private static HBaseTestingUtility util; 072 private static final byte[] dummy = Bytes.toBytes("dummy"); 073 private static final byte[] row1 = Bytes.toBytes("r1"); 074 private static final byte[] row2 = Bytes.toBytes("r2"); 075 private static final byte[] row3 = Bytes.toBytes("r3"); 076 private static final byte[] test = Bytes.toBytes("test"); 077 078 @Rule 079 public TestName name = new TestName(); 080 private TableName tableName; 081 082 @BeforeClass 083 public static void setUpBeforeClass() throws Exception { 084 Configuration conf = HBaseConfiguration.create(); 085 conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, TestWALObserver.class.getName()); 086 util = new HBaseTestingUtility(conf); 087 util.startMiniCluster(); 088 } 089 090 @AfterClass 091 public static void tearDownAfterClass() throws Exception { 092 util.shutdownMiniCluster(); 093 } 094 095 @Before 096 public void setUp() throws Exception { 097 tableName = TableName.valueOf(name.getMethodName()); 098 } 099 100 private void createTable(String coprocessor) throws IOException { 101 HTableDescriptor htd = new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(dummy)) 102 .addFamily(new HColumnDescriptor(test)).addCoprocessor(coprocessor); 103 util.getAdmin().createTable(htd); 104 } 105 106 /** 107 * Test various multiput operations. 108 */ 109 @Test 110 public void testMulti() throws Exception { 111 createTable(TestMultiMutationCoprocessor.class.getName()); 112 113 try (Table t = util.getConnection().getTable(tableName)) { 114 t.put(new Put(row1).addColumn(test, dummy, dummy)); 115 assertRowCount(t, 3); 116 } 117 } 118 119 /** 120 * Tests that added mutations from coprocessors end up in the WAL. 121 */ 122 @Test 123 public void testCPMutationsAreWrittenToWALEdit() throws Exception { 124 createTable(TestMultiMutationCoprocessor.class.getName()); 125 126 try (Table t = util.getConnection().getTable(tableName)) { 127 t.put(new Put(row1).addColumn(test, dummy, dummy)); 128 assertRowCount(t, 3); 129 } 130 131 assertNotNull(TestWALObserver.savedEdit); 132 assertEquals(4, TestWALObserver.savedEdit.getCells().size()); 133 } 134 135 private static void assertRowCount(Table t, int expected) throws IOException { 136 try (ResultScanner scanner = t.getScanner(new Scan())) { 137 int i = 0; 138 for (Result r : scanner) { 139 LOG.info(r.toString()); 140 i++; 141 } 142 assertEquals(expected, i); 143 } 144 } 145 146 @Test 147 public void testDeleteCell() throws Exception { 148 createTable(TestDeleteCellCoprocessor.class.getName()); 149 150 try (Table t = util.getConnection().getTable(tableName)) { 151 t.put(Lists.newArrayList(new Put(row1).addColumn(test, dummy, dummy), 152 new Put(row2).addColumn(test, dummy, dummy), new Put(row3).addColumn(test, dummy, dummy))); 153 154 assertRowCount(t, 3); 155 156 t.delete(new Delete(test).addColumn(test, dummy)); // delete non-existing row 157 assertRowCount(t, 1); 158 } 159 } 160 161 @Test 162 public void testDeleteFamily() throws Exception { 163 createTable(TestDeleteFamilyCoprocessor.class.getName()); 164 165 try (Table t = util.getConnection().getTable(tableName)) { 166 t.put(Lists.newArrayList(new Put(row1).addColumn(test, dummy, dummy), 167 new Put(row2).addColumn(test, dummy, dummy), new Put(row3).addColumn(test, dummy, dummy))); 168 169 assertRowCount(t, 3); 170 171 t.delete(new Delete(test).addFamily(test)); // delete non-existing row 172 assertRowCount(t, 1); 173 } 174 } 175 176 @Test 177 public void testDeleteRow() throws Exception { 178 createTable(TestDeleteRowCoprocessor.class.getName()); 179 180 try (Table t = util.getConnection().getTable(tableName)) { 181 t.put(Lists.newArrayList(new Put(row1).addColumn(test, dummy, dummy), 182 new Put(row2).addColumn(test, dummy, dummy), new Put(row3).addColumn(test, dummy, dummy))); 183 184 assertRowCount(t, 3); 185 186 t.delete(new Delete(test).addColumn(test, dummy)); // delete non-existing row 187 assertRowCount(t, 1); 188 } 189 } 190 191 @Test 192 public void testPutWithTTL() throws Exception { 193 createTable(TestPutWithTTLCoprocessor.class.getName()); 194 195 try (Table t = util.getConnection().getTable(tableName)) { 196 t.put(new Put(row1).addColumn(test, dummy, dummy).setTTL(3000)); 197 assertRowCount(t, 2); 198 // wait long enough for the TTL to expire 199 Thread.sleep(5000); 200 assertRowCount(t, 0); 201 } 202 } 203 204 public static class TestPutWithTTLCoprocessor implements RegionCoprocessor, RegionObserver { 205 @Override 206 public Optional<RegionObserver> getRegionObserver() { 207 return Optional.of(this); 208 } 209 210 @Override 211 public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 212 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 213 Mutation mut = miniBatchOp.getOperation(0); 214 List<Cell> cells = mut.getFamilyCellMap().get(test); 215 Put[] puts = new Put[] { new Put(Bytes.toBytes("cpPut")) 216 .addColumn(test, dummy, cells.get(0).getTimestamp(), Bytes.toBytes("cpdummy")) 217 .setTTL(mut.getTTL()) }; 218 LOG.info("Putting:" + Arrays.toString(puts)); 219 miniBatchOp.addOperationsFromCP(0, puts); 220 } 221 } 222 223 public static class TestMultiMutationCoprocessor implements RegionCoprocessor, RegionObserver { 224 @Override 225 public Optional<RegionObserver> getRegionObserver() { 226 return Optional.of(this); 227 } 228 229 @Override 230 public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 231 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 232 Mutation mut = miniBatchOp.getOperation(0); 233 List<Cell> cells = mut.getFamilyCellMap().get(test); 234 Put[] puts = new Put[] { 235 new Put(row1).addColumn(test, dummy, cells.get(0).getTimestamp(), Bytes.toBytes("cpdummy")), 236 new Put(row2).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy), 237 new Put(row3).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy), }; 238 LOG.info("Putting:" + Arrays.toString(puts)); 239 miniBatchOp.addOperationsFromCP(0, puts); 240 } 241 } 242 243 public static class TestDeleteCellCoprocessor implements RegionCoprocessor, RegionObserver { 244 @Override 245 public Optional<RegionObserver> getRegionObserver() { 246 return Optional.of(this); 247 } 248 249 @Override 250 public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 251 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 252 Mutation mut = miniBatchOp.getOperation(0); 253 254 if (mut instanceof Delete) { 255 List<Cell> cells = mut.getFamilyCellMap().get(test); 256 Delete[] deletes = new Delete[] { 257 // delete only 2 rows 258 new Delete(row1).addColumns(test, dummy, cells.get(0).getTimestamp()), 259 new Delete(row2).addColumns(test, dummy, cells.get(0).getTimestamp()), }; 260 LOG.info("Deleting:" + Arrays.toString(deletes)); 261 miniBatchOp.addOperationsFromCP(0, deletes); 262 } 263 } 264 } 265 266 public static class TestDeleteFamilyCoprocessor implements RegionCoprocessor, RegionObserver { 267 @Override 268 public Optional<RegionObserver> getRegionObserver() { 269 return Optional.of(this); 270 } 271 272 @Override 273 public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 274 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 275 Mutation mut = miniBatchOp.getOperation(0); 276 277 if (mut instanceof Delete) { 278 List<Cell> cells = mut.getFamilyCellMap().get(test); 279 Delete[] deletes = new Delete[] { 280 // delete only 2 rows 281 new Delete(row1).addFamily(test, cells.get(0).getTimestamp()), 282 new Delete(row2).addFamily(test, cells.get(0).getTimestamp()), }; 283 LOG.info("Deleting:" + Arrays.toString(deletes)); 284 miniBatchOp.addOperationsFromCP(0, deletes); 285 } 286 } 287 } 288 289 public static class TestDeleteRowCoprocessor implements RegionCoprocessor, RegionObserver { 290 @Override 291 public Optional<RegionObserver> getRegionObserver() { 292 return Optional.of(this); 293 } 294 295 @Override 296 public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 297 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 298 Mutation mut = miniBatchOp.getOperation(0); 299 300 if (mut instanceof Delete) { 301 List<Cell> cells = mut.getFamilyCellMap().get(test); 302 Delete[] deletes = new Delete[] { 303 // delete only 2 rows 304 new Delete(row1, cells.get(0).getTimestamp()), 305 new Delete(row2, cells.get(0).getTimestamp()), }; 306 LOG.info("Deleting:" + Arrays.toString(deletes)); 307 miniBatchOp.addOperationsFromCP(0, deletes); 308 } 309 } 310 } 311 312 public static class TestWALObserver implements WALCoprocessor, WALObserver { 313 static WALEdit savedEdit = null; 314 315 @Override 316 public Optional<WALObserver> getWALObserver() { 317 return Optional.of(this); 318 } 319 320 @Override 321 public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx, 322 RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { 323 if (info.getTable().equals(TableName.valueOf("testCPMutationsAreWrittenToWALEdit"))) { 324 savedEdit = logEdit; 325 } 326 } 327 } 328}