001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.nio.ByteBuffer; 026import java.util.Arrays; 027import java.util.List; 028import java.util.Optional; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.stream.Collectors; 031import java.util.stream.IntStream; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.CompareOperator; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.KeyValueUtil; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.Append; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Durability; 045import org.apache.hadoop.hbase.client.Get; 046import org.apache.hadoop.hbase.client.Increment; 047import org.apache.hadoop.hbase.client.Put; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 051import org.apache.hadoop.hbase.filter.ByteArrayComparable; 052import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.wal.WALEdit; 056import org.junit.AfterClass; 057import org.junit.Before; 058import org.junit.BeforeClass; 059import org.junit.ClassRule; 060import org.junit.Rule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.junit.rules.TestName; 064 065@Category({ CoprocessorTests.class, MediumTests.class }) 066public class TestPassCustomCellViaRegionObserver { 067 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestPassCustomCellViaRegionObserver.class); 071 072 @Rule 073 public TestName testName = new TestName(); 074 075 private TableName tableName; 076 private Table table = null; 077 078 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 079 080 private static final byte[] ROW = Bytes.toBytes("ROW"); 081 private static final byte[] FAMILY = Bytes.toBytes("FAMILY"); 082 private static final byte[] QUALIFIER = Bytes.toBytes("QUALIFIER"); 083 private static final byte[] VALUE = Bytes.toBytes(10L); 084 private static final byte[] APPEND_VALUE = Bytes.toBytes("MB"); 085 086 private static final byte[] QUALIFIER_FROM_CP = Bytes.toBytes("QUALIFIER_FROM_CP"); 087 088 @BeforeClass 089 public static void setupBeforeClass() throws Exception { 090 // small retry number can speed up the failed tests. 091 UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 092 UTIL.startMiniCluster(); 093 } 094 095 @AfterClass 096 public static void tearDownAfterClass() throws Exception { 097 UTIL.shutdownMiniCluster(); 098 } 099 100 @Before 101 public void clearTable() throws IOException { 102 RegionObserverImpl.COUNT.set(0); 103 tableName = TableName.valueOf(testName.getMethodName()); 104 if (table != null) { 105 table.close(); 106 } 107 try (Admin admin = UTIL.getAdmin()) { 108 for (TableName name : admin.listTableNames()) { 109 try { 110 admin.disableTable(name); 111 } catch (IOException e) { 112 } 113 admin.deleteTable(name); 114 } 115 table = UTIL.createTable(TableDescriptorBuilder.newBuilder(tableName) 116 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 117 .setCoprocessor(RegionObserverImpl.class.getName()).build(), null); 118 } 119 } 120 121 @Test 122 public void testMutation() throws Exception { 123 124 Put put = new Put(ROW); 125 put.addColumn(FAMILY, QUALIFIER, VALUE); 126 table.put(put); 127 byte[] value = VALUE; 128 assertResult(table.get(new Get(ROW)), value, value); 129 assertObserverHasExecuted(); 130 131 Increment inc = new Increment(ROW); 132 inc.addColumn(FAMILY, QUALIFIER, 10L); 133 table.increment(inc); 134 // QUALIFIER -> 10 (put) + 10 (increment) 135 // QUALIFIER_FROM_CP -> 10 (from cp's put) + 10 (from cp's increment) 136 value = Bytes.toBytes(20L); 137 assertResult(table.get(new Get(ROW)), value, value); 138 assertObserverHasExecuted(); 139 140 Append append = new Append(ROW); 141 append.addColumn(FAMILY, QUALIFIER, APPEND_VALUE); 142 table.append(append); 143 // 10L + "MB" 144 value = ByteBuffer.wrap(new byte[value.length + APPEND_VALUE.length]).put(value) 145 .put(APPEND_VALUE).array(); 146 assertResult(table.get(new Get(ROW)), value, value); 147 assertObserverHasExecuted(); 148 149 Delete delete = new Delete(ROW); 150 delete.addColumns(FAMILY, QUALIFIER); 151 table.delete(delete); 152 assertTrue(Arrays.asList(table.get(new Get(ROW)).rawCells()).toString(), 153 table.get(new Get(ROW)).isEmpty()); 154 assertObserverHasExecuted(); 155 156 assertTrue(table.checkAndPut(ROW, FAMILY, QUALIFIER, null, put)); 157 assertObserverHasExecuted(); 158 159 assertTrue(table.checkAndDelete(ROW, FAMILY, QUALIFIER, VALUE, delete)); 160 assertObserverHasExecuted(); 161 162 assertTrue(table.get(new Get(ROW)).isEmpty()); 163 } 164 165 @Test 166 public void testMultiPut() throws Exception { 167 List<Put> puts = 168 IntStream.range(0, 10).mapToObj(i -> new Put(ROW).addColumn(FAMILY, Bytes.toBytes(i), VALUE)) 169 .collect(Collectors.toList()); 170 table.put(puts); 171 assertResult(table.get(new Get(ROW)), VALUE); 172 assertObserverHasExecuted(); 173 174 List<Delete> deletes = 175 IntStream.range(0, 10).mapToObj(i -> new Delete(ROW).addColumn(FAMILY, Bytes.toBytes(i))) 176 .collect(Collectors.toList()); 177 table.delete(deletes); 178 assertTrue(table.get(new Get(ROW)).isEmpty()); 179 assertObserverHasExecuted(); 180 } 181 182 private static void assertObserverHasExecuted() { 183 assertTrue(RegionObserverImpl.COUNT.getAndSet(0) > 0); 184 } 185 186 private static void assertResult(Result result, byte[] expectedValue) { 187 assertFalse(result.isEmpty()); 188 for (Cell c : result.rawCells()) { 189 assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c))); 190 assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c))); 191 assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c))); 192 } 193 } 194 195 private static void assertResult(Result result, byte[] expectedValue, byte[] expectedFromCp) { 196 assertFalse(result.isEmpty()); 197 for (Cell c : result.rawCells()) { 198 assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c))); 199 assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c))); 200 if (Bytes.equals(QUALIFIER, CellUtil.cloneQualifier(c))) { 201 assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c))); 202 } else if (Bytes.equals(QUALIFIER_FROM_CP, CellUtil.cloneQualifier(c))) { 203 assertTrue(c.toString(), Bytes.equals(expectedFromCp, CellUtil.cloneValue(c))); 204 } else { 205 fail("No valid qualifier"); 206 } 207 } 208 } 209 210 private static Cell createCustomCell(byte[] row, byte[] family, byte[] qualifier, Cell.Type type, 211 byte[] value) { 212 return new Cell() { 213 214 @Override 215 public long heapSize() { 216 return 0; 217 } 218 219 private byte[] getArray(byte[] array) { 220 return array == null ? HConstants.EMPTY_BYTE_ARRAY : array; 221 } 222 223 private int length(byte[] array) { 224 return array == null ? 0 : array.length; 225 } 226 227 @Override 228 public byte[] getRowArray() { 229 return getArray(row); 230 } 231 232 @Override 233 public int getRowOffset() { 234 return 0; 235 } 236 237 @Override 238 public short getRowLength() { 239 return (short) length(row); 240 } 241 242 @Override 243 public byte[] getFamilyArray() { 244 return getArray(family); 245 } 246 247 @Override 248 public int getFamilyOffset() { 249 return 0; 250 } 251 252 @Override 253 public byte getFamilyLength() { 254 return (byte) length(family); 255 } 256 257 @Override 258 public byte[] getQualifierArray() { 259 return getArray(qualifier); 260 } 261 262 @Override 263 public int getQualifierOffset() { 264 return 0; 265 } 266 267 @Override 268 public int getQualifierLength() { 269 return length(qualifier); 270 } 271 272 @Override 273 public long getTimestamp() { 274 return HConstants.LATEST_TIMESTAMP; 275 } 276 277 @Override 278 public byte getTypeByte() { 279 return type.getCode(); 280 } 281 282 @Override 283 public long getSequenceId() { 284 return 0; 285 } 286 287 @Override 288 public byte[] getValueArray() { 289 return getArray(value); 290 } 291 292 @Override 293 public int getValueOffset() { 294 return 0; 295 } 296 297 @Override 298 public int getValueLength() { 299 return length(value); 300 } 301 302 @Override 303 public int getSerializedSize() { 304 return KeyValueUtil.getSerializedSize(this, true); 305 } 306 307 @Override 308 public byte[] getTagsArray() { 309 return getArray(null); 310 } 311 312 @Override 313 public int getTagsOffset() { 314 return 0; 315 } 316 317 @Override 318 public int getTagsLength() { 319 return length(null); 320 } 321 322 @Override 323 public Type getType() { 324 return type; 325 } 326 }; 327 } 328 329 private static Cell createCustomCell(Put put) { 330 return createCustomCell(put.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, VALUE); 331 } 332 333 private static Cell createCustomCell(Append append) { 334 return createCustomCell(append.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, 335 APPEND_VALUE); 336 } 337 338 private static Cell createCustomCell(Increment inc) { 339 return createCustomCell(inc.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.Put, VALUE); 340 } 341 342 private static Cell createCustomCell(Delete delete) { 343 return createCustomCell(delete.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.Type.DeleteColumn, 344 null); 345 } 346 347 public static class RegionObserverImpl implements RegionCoprocessor, RegionObserver { 348 static final AtomicInteger COUNT = new AtomicInteger(0); 349 350 @Override 351 public Optional<RegionObserver> getRegionObserver() { 352 return Optional.of(this); 353 } 354 355 @Override 356 public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, 357 Durability durability) throws IOException { 358 put.add(createCustomCell(put)); 359 COUNT.incrementAndGet(); 360 } 361 362 @Override 363 public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete, 364 WALEdit edit, Durability durability) throws IOException { 365 delete.add(createCustomCell(delete)); 366 COUNT.incrementAndGet(); 367 } 368 369 @Override 370 public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row, 371 byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put, 372 boolean result) throws IOException { 373 put.add(createCustomCell(put)); 374 COUNT.incrementAndGet(); 375 return result; 376 } 377 378 @Override 379 public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row, 380 byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, 381 Delete delete, boolean result) throws IOException { 382 delete.add(createCustomCell(delete)); 383 COUNT.incrementAndGet(); 384 return result; 385 } 386 387 @Override 388 public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append) 389 throws IOException { 390 append.add(createCustomCell(append)); 391 COUNT.incrementAndGet(); 392 return null; 393 } 394 395 @Override 396 public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment) 397 throws IOException { 398 increment.add(createCustomCell(increment)); 399 COUNT.incrementAndGet(); 400 return null; 401 } 402 403 } 404 405}