001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.CoreMatchers.instanceOf; 022import static org.hamcrest.MatcherAssert.assertThat; 023import static org.junit.Assert.assertArrayEquals; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertNull; 027import static org.junit.Assert.assertTrue; 028import static org.junit.Assert.fail; 029 030import java.io.IOException; 031import java.io.UncheckedIOException; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.List; 035import java.util.concurrent.ArrayBlockingQueue; 036import java.util.concurrent.BlockingQueue; 037import java.util.concurrent.CountDownLatch; 038import java.util.concurrent.ExecutionException; 039import java.util.concurrent.ForkJoinPool; 040import java.util.concurrent.atomic.AtomicInteger; 041import java.util.concurrent.atomic.AtomicLong; 042import java.util.function.Supplier; 043import java.util.stream.IntStream; 044import org.apache.hadoop.hbase.CompareOperator; 045import org.apache.hadoop.hbase.HBaseClassTestRule; 046import org.apache.hadoop.hbase.HBaseTestingUtil; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.TableNotEnabledException; 049import org.apache.hadoop.hbase.filter.BinaryComparator; 050import org.apache.hadoop.hbase.filter.FamilyFilter; 051import org.apache.hadoop.hbase.filter.FilterList; 052import org.apache.hadoop.hbase.filter.QualifierFilter; 053import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 054import org.apache.hadoop.hbase.filter.TimestampsFilter; 055import org.apache.hadoop.hbase.io.TimeRange; 056import org.apache.hadoop.hbase.testclassification.ClientTests; 057import org.apache.hadoop.hbase.testclassification.MediumTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.hbase.util.Pair; 061import org.junit.AfterClass; 062import org.junit.Before; 063import org.junit.BeforeClass; 064import org.junit.ClassRule; 065import org.junit.Rule; 066import org.junit.Test; 067import org.junit.experimental.categories.Category; 068import org.junit.rules.TestName; 069import org.junit.runner.RunWith; 070import org.junit.runners.Parameterized; 071import org.junit.runners.Parameterized.Parameter; 072import org.junit.runners.Parameterized.Parameters; 073 074import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 075 076@RunWith(Parameterized.class) 077@Category({ MediumTests.class, ClientTests.class }) 078public class TestAsyncTable { 079 080 @ClassRule 081 public static final HBaseClassTestRule CLASS_RULE = 082 HBaseClassTestRule.forClass(TestAsyncTable.class); 083 084 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 085 086 private static TableName TABLE_NAME = TableName.valueOf("async"); 087 088 private static byte[] FAMILY = Bytes.toBytes("cf"); 089 090 private static byte[] QUALIFIER = Bytes.toBytes("cq"); 091 092 private static byte[] VALUE = Bytes.toBytes("value"); 093 094 private static int MAX_KEY_VALUE_SIZE = 64 * 1024; 095 096 private static AsyncConnection ASYNC_CONN; 097 098 @Rule 099 public TestName testName = new TestName(); 100 101 private byte[] row; 102 103 @Parameter 104 public Supplier<AsyncTable<?>> getTable; 105 106 private static AsyncTable<?> getRawTable() { 107 return ASYNC_CONN.getTable(TABLE_NAME); 108 } 109 110 private static AsyncTable<?> getTable() { 111 return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); 112 } 113 114 @Parameters 115 public static List<Object[]> params() { 116 return Arrays.asList(new Supplier<?>[] { TestAsyncTable::getRawTable }, 117 new Supplier<?>[] { TestAsyncTable::getTable }); 118 } 119 120 @BeforeClass 121 public static void setUpBeforeClass() throws Exception { 122 TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, 123 MAX_KEY_VALUE_SIZE); 124 TEST_UTIL.startMiniCluster(1); 125 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 126 TEST_UTIL.waitTableAvailable(TABLE_NAME); 127 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 128 assertFalse(ASYNC_CONN.isClosed()); 129 } 130 131 @AfterClass 132 public static void tearDownAfterClass() throws Exception { 133 Closeables.close(ASYNC_CONN, true); 134 assertTrue(ASYNC_CONN.isClosed()); 135 TEST_UTIL.shutdownMiniCluster(); 136 } 137 138 @Before 139 public void setUp() throws IOException, InterruptedException, ExecutionException { 140 row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_")); 141 if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) { 142 ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get(); 143 } 144 } 145 146 @Test 147 public void testSimple() throws Exception { 148 AsyncTable<?> table = getTable.get(); 149 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); 150 assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); 151 Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); 152 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 153 table.delete(new Delete(row)).get(); 154 result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); 155 assertTrue(result.isEmpty()); 156 assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); 157 } 158 159 private byte[] concat(byte[] base, int index) { 160 return Bytes.toBytes(Bytes.toString(base) + "-" + index); 161 } 162 163 @SuppressWarnings("FutureReturnValueIgnored") 164 @Test 165 public void testSimpleMultiple() throws Exception { 166 AsyncTable<?> table = getTable.get(); 167 int count = 100; 168 CountDownLatch putLatch = new CountDownLatch(count); 169 IntStream.range(0, count).forEach( 170 i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))) 171 .thenAccept(x -> putLatch.countDown())); 172 putLatch.await(); 173 BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count); 174 IntStream.range(0, count) 175 .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 176 .thenAccept(x -> existsResp.add(x))); 177 for (int i = 0; i < count; i++) { 178 assertTrue(existsResp.take()); 179 } 180 BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count); 181 IntStream.range(0, count) 182 .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 183 .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); 184 for (int i = 0; i < count; i++) { 185 Pair<Integer, Result> pair = getResp.take(); 186 assertArrayEquals(concat(VALUE, pair.getFirst()), 187 pair.getSecond().getValue(FAMILY, QUALIFIER)); 188 } 189 CountDownLatch deleteLatch = new CountDownLatch(count); 190 IntStream.range(0, count).forEach( 191 i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown())); 192 deleteLatch.await(); 193 IntStream.range(0, count) 194 .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 195 .thenAccept(x -> existsResp.add(x))); 196 for (int i = 0; i < count; i++) { 197 assertFalse(existsResp.take()); 198 } 199 IntStream.range(0, count) 200 .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 201 .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); 202 for (int i = 0; i < count; i++) { 203 Pair<Integer, Result> pair = getResp.take(); 204 assertTrue(pair.getSecond().isEmpty()); 205 } 206 } 207 208 @SuppressWarnings("FutureReturnValueIgnored") 209 @Test 210 public void testIncrement() throws InterruptedException, ExecutionException { 211 AsyncTable<?> table = getTable.get(); 212 int count = 100; 213 CountDownLatch latch = new CountDownLatch(count); 214 AtomicLong sum = new AtomicLong(0L); 215 IntStream.range(0, count) 216 .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> { 217 sum.addAndGet(x); 218 latch.countDown(); 219 })); 220 latch.await(); 221 assertEquals(count, Bytes.toLong( 222 table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER))); 223 assertEquals((1 + count) * count / 2, sum.get()); 224 } 225 226 @SuppressWarnings("FutureReturnValueIgnored") 227 @Test 228 public void testAppend() throws InterruptedException, ExecutionException { 229 AsyncTable<?> table = getTable.get(); 230 int count = 10; 231 CountDownLatch latch = new CountDownLatch(count); 232 char suffix = ':'; 233 AtomicLong suffixCount = new AtomicLong(0L); 234 IntStream.range(0, count) 235 .forEachOrdered(i -> table 236 .append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix))) 237 .thenAccept(r -> { 238 suffixCount.addAndGet( 239 Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count()); 240 latch.countDown(); 241 })); 242 latch.await(); 243 assertEquals((1 + count) * count / 2, suffixCount.get()); 244 String value = Bytes.toString( 245 table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)); 246 int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt) 247 .sorted().toArray(); 248 assertArrayEquals(IntStream.range(0, count).toArray(), actual); 249 } 250 251 @Test 252 public void testMutateRow() throws InterruptedException, ExecutionException, IOException { 253 AsyncTable<?> table = getTable.get(); 254 RowMutations mutation = new RowMutations(row); 255 mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE)); 256 Result result = table.mutateRow(mutation).get(); 257 assertTrue(result.getExists()); 258 assertTrue(result.isEmpty()); 259 260 result = table.get(new Get(row)).get(); 261 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1))); 262 263 mutation = new RowMutations(row); 264 mutation.add(new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1))); 265 mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE)); 266 mutation.add(new Increment(row).addColumn(FAMILY, concat(QUALIFIER, 3), 2L)); 267 mutation.add(new Append(row).addColumn(FAMILY, concat(QUALIFIER, 4), Bytes.toBytes("abc"))); 268 result = table.mutateRow(mutation).get(); 269 assertTrue(result.getExists()); 270 assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3)))); 271 assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4)))); 272 273 result = table.get(new Get(row)).get(); 274 assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1))); 275 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2))); 276 assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3)))); 277 assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4)))); 278 } 279 280 // Tests for old checkAndMutate API 281 282 @SuppressWarnings("FutureReturnValueIgnored") 283 @Test 284 @Deprecated 285 public void testCheckAndPutForOldApi() throws InterruptedException, ExecutionException { 286 AsyncTable<?> table = getTable.get(); 287 AtomicInteger successCount = new AtomicInteger(0); 288 AtomicInteger successIndex = new AtomicInteger(-1); 289 int count = 10; 290 CountDownLatch latch = new CountDownLatch(count); 291 IntStream.range(0, count) 292 .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists() 293 .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> { 294 if (x) { 295 successCount.incrementAndGet(); 296 successIndex.set(i); 297 } 298 latch.countDown(); 299 })); 300 latch.await(); 301 assertEquals(1, successCount.get()); 302 String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER)); 303 assertTrue(actual.endsWith(Integer.toString(successIndex.get()))); 304 } 305 306 @SuppressWarnings("FutureReturnValueIgnored") 307 @Test 308 @Deprecated 309 public void testCheckAndDeleteForOldApi() throws InterruptedException, ExecutionException { 310 AsyncTable<?> table = getTable.get(); 311 int count = 10; 312 CountDownLatch putLatch = new CountDownLatch(count + 1); 313 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 314 IntStream.range(0, count) 315 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 316 .thenRun(() -> putLatch.countDown())); 317 putLatch.await(); 318 319 AtomicInteger successCount = new AtomicInteger(0); 320 AtomicInteger successIndex = new AtomicInteger(-1); 321 CountDownLatch deleteLatch = new CountDownLatch(count); 322 IntStream.range(0, count) 323 .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE) 324 .thenDelete( 325 new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))) 326 .thenAccept(x -> { 327 if (x) { 328 successCount.incrementAndGet(); 329 successIndex.set(i); 330 } 331 deleteLatch.countDown(); 332 })); 333 deleteLatch.await(); 334 assertEquals(1, successCount.get()); 335 Result result = table.get(new Get(row)).get(); 336 IntStream.range(0, count).forEach(i -> { 337 if (i == successIndex.get()) { 338 assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i))); 339 } else { 340 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 341 } 342 }); 343 } 344 345 @SuppressWarnings("FutureReturnValueIgnored") 346 @Test 347 @Deprecated 348 public void testCheckAndMutateForOldApi() throws InterruptedException, ExecutionException { 349 AsyncTable<?> table = getTable.get(); 350 int count = 10; 351 CountDownLatch putLatch = new CountDownLatch(count + 1); 352 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 353 IntStream.range(0, count) 354 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 355 .thenRun(() -> putLatch.countDown())); 356 putLatch.await(); 357 358 AtomicInteger successCount = new AtomicInteger(0); 359 AtomicInteger successIndex = new AtomicInteger(-1); 360 CountDownLatch mutateLatch = new CountDownLatch(count); 361 IntStream.range(0, count).forEach(i -> { 362 RowMutations mutation = new RowMutations(row); 363 try { 364 mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER)); 365 mutation 366 .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i))); 367 } catch (IOException e) { 368 throw new UncheckedIOException(e); 369 } 370 table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation) 371 .thenAccept(x -> { 372 if (x) { 373 successCount.incrementAndGet(); 374 successIndex.set(i); 375 } 376 mutateLatch.countDown(); 377 }); 378 }); 379 mutateLatch.await(); 380 assertEquals(1, successCount.get()); 381 Result result = table.get(new Get(row)).get(); 382 IntStream.range(0, count).forEach(i -> { 383 if (i == successIndex.get()) { 384 assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i))); 385 } else { 386 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 387 } 388 }); 389 } 390 391 @Test 392 @Deprecated 393 public void testCheckAndMutateWithTimeRangeForOldApi() throws Exception { 394 AsyncTable<?> table = getTable.get(); 395 final long ts = EnvironmentEdgeManager.currentTime() / 2; 396 Put put = new Put(row); 397 put.addColumn(FAMILY, QUALIFIER, ts, VALUE); 398 399 boolean ok = 400 table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get(); 401 assertTrue(ok); 402 403 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) 404 .ifEquals(VALUE).thenPut(put).get(); 405 assertFalse(ok); 406 407 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) 408 .ifEquals(VALUE).thenPut(put).get(); 409 assertTrue(ok); 410 411 RowMutations rm = new RowMutations(row).add((Mutation) put); 412 413 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) 414 .ifEquals(VALUE).thenMutate(rm).get(); 415 assertFalse(ok); 416 417 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) 418 .ifEquals(VALUE).thenMutate(rm).get(); 419 assertTrue(ok); 420 421 Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER); 422 423 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) 424 .ifEquals(VALUE).thenDelete(delete).get(); 425 assertFalse(ok); 426 427 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) 428 .ifEquals(VALUE).thenDelete(delete).get(); 429 assertTrue(ok); 430 } 431 432 @Test 433 @Deprecated 434 public void testCheckAndMutateWithSingleFilterForOldApi() throws Throwable { 435 AsyncTable<?> table = getTable.get(); 436 437 // Put one row 438 Put put = new Put(row); 439 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 440 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 441 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 442 table.put(put).get(); 443 444 // Put with success 445 boolean ok = table 446 .checkAndMutate(row, 447 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 448 Bytes.toBytes("a"))) 449 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))).get(); 450 assertTrue(ok); 451 452 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 453 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 454 455 // Put with failure 456 ok = table 457 .checkAndMutate(row, 458 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 459 Bytes.toBytes("b"))) 460 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))).get(); 461 assertFalse(ok); 462 463 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); 464 465 // Delete with success 466 ok = table 467 .checkAndMutate(row, 468 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 469 Bytes.toBytes("a"))) 470 .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))).get(); 471 assertTrue(ok); 472 473 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); 474 475 // Mutate with success 476 ok = table 477 .checkAndMutate(row, 478 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 479 Bytes.toBytes("b"))) 480 .thenMutate(new RowMutations(row) 481 .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 482 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))) 483 .get(); 484 assertTrue(ok); 485 486 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 487 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 488 489 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 490 } 491 492 @Test 493 @Deprecated 494 public void testCheckAndMutateWithMultipleFiltersForOldApi() throws Throwable { 495 AsyncTable<?> table = getTable.get(); 496 497 // Put one row 498 Put put = new Put(row); 499 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 500 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 501 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 502 table.put(put).get(); 503 504 // Put with success 505 boolean ok = table 506 .checkAndMutate(row, 507 new FilterList( 508 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 509 Bytes.toBytes("a")), 510 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 511 Bytes.toBytes("b")))) 512 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))).get(); 513 assertTrue(ok); 514 515 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 516 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 517 518 // Put with failure 519 ok = table 520 .checkAndMutate(row, 521 new FilterList( 522 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 523 Bytes.toBytes("a")), 524 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 525 Bytes.toBytes("c")))) 526 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))).get(); 527 assertFalse(ok); 528 529 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); 530 531 // Delete with success 532 ok = table 533 .checkAndMutate(row, 534 new FilterList( 535 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 536 Bytes.toBytes("a")), 537 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 538 Bytes.toBytes("b")))) 539 .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))).get(); 540 assertTrue(ok); 541 542 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); 543 544 // Mutate with success 545 ok = table 546 .checkAndMutate(row, 547 new FilterList( 548 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 549 Bytes.toBytes("a")), 550 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 551 Bytes.toBytes("b")))) 552 .thenMutate(new RowMutations(row) 553 .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 554 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))) 555 .get(); 556 assertTrue(ok); 557 558 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 559 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 560 561 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 562 } 563 564 @Test 565 @Deprecated 566 public void testCheckAndMutateWithTimestampFilterForOldApi() throws Throwable { 567 AsyncTable<?> table = getTable.get(); 568 569 // Put with specifying the timestamp 570 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get(); 571 572 // Put with success 573 boolean ok = table 574 .checkAndMutate(row, 575 new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), 576 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), 577 new TimestampsFilter(Collections.singletonList(100L)))) 578 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))).get(); 579 assertTrue(ok); 580 581 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 582 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 583 584 // Put with failure 585 ok = table 586 .checkAndMutate(row, 587 new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), 588 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), 589 new TimestampsFilter(Collections.singletonList(101L)))) 590 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))).get(); 591 assertFalse(ok); 592 593 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 594 } 595 596 @Test 597 @Deprecated 598 public void testCheckAndMutateWithFilterAndTimeRangeForOldApi() throws Throwable { 599 AsyncTable<?> table = getTable.get(); 600 601 // Put with specifying the timestamp 602 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get(); 603 604 // Put with success 605 boolean ok = table 606 .checkAndMutate(row, 607 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 608 Bytes.toBytes("a"))) 609 .timeRange(TimeRange.between(0, 101)) 610 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))).get(); 611 assertTrue(ok); 612 613 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 614 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 615 616 // Put with failure 617 ok = table 618 .checkAndMutate(row, 619 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 620 Bytes.toBytes("a"))) 621 .timeRange(TimeRange.between(0, 100)) 622 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))).get(); 623 assertFalse(ok); 624 625 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 626 } 627 628 @Test(expected = NullPointerException.class) 629 @Deprecated 630 public void testCheckAndMutateWithoutConditionForOldApi() { 631 getTable.get().checkAndMutate(row, FAMILY) 632 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); 633 } 634 635 // Tests for new CheckAndMutate API 636 637 @SuppressWarnings("FutureReturnValueIgnored") 638 @Test 639 public void testCheckAndPut() throws InterruptedException, ExecutionException { 640 AsyncTable<?> table = getTable.get(); 641 AtomicInteger successCount = new AtomicInteger(0); 642 AtomicInteger successIndex = new AtomicInteger(-1); 643 int count = 10; 644 CountDownLatch latch = new CountDownLatch(count); 645 646 IntStream.range(0, count).forEach( 647 i -> table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER) 648 .build(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))).thenAccept(x -> { 649 if (x.isSuccess()) { 650 successCount.incrementAndGet(); 651 successIndex.set(i); 652 } 653 assertNull(x.getResult()); 654 latch.countDown(); 655 })); 656 latch.await(); 657 assertEquals(1, successCount.get()); 658 String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER)); 659 assertTrue(actual.endsWith(Integer.toString(successIndex.get()))); 660 } 661 662 @SuppressWarnings("FutureReturnValueIgnored") 663 @Test 664 public void testCheckAndDelete() throws InterruptedException, ExecutionException { 665 AsyncTable<?> table = getTable.get(); 666 int count = 10; 667 CountDownLatch putLatch = new CountDownLatch(count + 1); 668 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 669 IntStream.range(0, count) 670 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 671 .thenRun(() -> putLatch.countDown())); 672 putLatch.await(); 673 674 AtomicInteger successCount = new AtomicInteger(0); 675 AtomicInteger successIndex = new AtomicInteger(-1); 676 CountDownLatch deleteLatch = new CountDownLatch(count); 677 678 IntStream.range(0, count) 679 .forEach(i -> table 680 .checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE).build( 681 new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))) 682 .thenAccept(x -> { 683 if (x.isSuccess()) { 684 successCount.incrementAndGet(); 685 successIndex.set(i); 686 } 687 assertNull(x.getResult()); 688 deleteLatch.countDown(); 689 })); 690 deleteLatch.await(); 691 assertEquals(1, successCount.get()); 692 Result result = table.get(new Get(row)).get(); 693 IntStream.range(0, count).forEach(i -> { 694 if (i == successIndex.get()) { 695 assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i))); 696 } else { 697 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 698 } 699 }); 700 } 701 702 @SuppressWarnings("FutureReturnValueIgnored") 703 @Test 704 public void testCheckAndMutate() throws InterruptedException, ExecutionException { 705 AsyncTable<?> table = getTable.get(); 706 int count = 10; 707 CountDownLatch putLatch = new CountDownLatch(count + 1); 708 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 709 IntStream.range(0, count) 710 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 711 .thenRun(() -> putLatch.countDown())); 712 putLatch.await(); 713 714 AtomicInteger successCount = new AtomicInteger(0); 715 AtomicInteger successIndex = new AtomicInteger(-1); 716 CountDownLatch mutateLatch = new CountDownLatch(count); 717 IntStream.range(0, count).forEach(i -> { 718 RowMutations mutation = new RowMutations(row); 719 try { 720 mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER)); 721 mutation 722 .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i))); 723 } catch (IOException e) { 724 throw new UncheckedIOException(e); 725 } 726 727 table 728 .checkAndMutate( 729 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE).build(mutation)) 730 .thenAccept(x -> { 731 if (x.isSuccess()) { 732 successCount.incrementAndGet(); 733 successIndex.set(i); 734 } 735 assertNull(x.getResult()); 736 mutateLatch.countDown(); 737 }); 738 }); 739 mutateLatch.await(); 740 assertEquals(1, successCount.get()); 741 Result result = table.get(new Get(row)).get(); 742 IntStream.range(0, count).forEach(i -> { 743 if (i == successIndex.get()) { 744 assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i))); 745 } else { 746 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 747 } 748 }); 749 } 750 751 @Test 752 public void testCheckAndMutateWithTimeRange() throws Exception { 753 AsyncTable<?> table = getTable.get(); 754 final long ts = EnvironmentEdgeManager.currentTime() / 2; 755 Put put = new Put(row); 756 put.addColumn(FAMILY, QUALIFIER, ts, VALUE); 757 758 CheckAndMutateResult result = 759 table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER).build(put)) 760 .get(); 761 assertTrue(result.isSuccess()); 762 assertNull(result.getResult()); 763 764 result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE) 765 .timeRange(TimeRange.at(ts + 10000)).build(put)).get(); 766 assertFalse(result.isSuccess()); 767 assertNull(result.getResult()); 768 769 result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE) 770 .timeRange(TimeRange.at(ts)).build(put)).get(); 771 assertTrue(result.isSuccess()); 772 assertNull(result.getResult()); 773 774 RowMutations rm = new RowMutations(row).add((Mutation) put); 775 776 result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE) 777 .timeRange(TimeRange.at(ts + 10000)).build(rm)).get(); 778 assertFalse(result.isSuccess()); 779 assertNull(result.getResult()); 780 781 result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE) 782 .timeRange(TimeRange.at(ts)).build(rm)).get(); 783 assertTrue(result.isSuccess()); 784 assertNull(result.getResult()); 785 786 Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER); 787 788 result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE) 789 .timeRange(TimeRange.at(ts + 10000)).build(delete)).get(); 790 assertFalse(result.isSuccess()); 791 assertNull(result.getResult()); 792 793 result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE) 794 .timeRange(TimeRange.at(ts)).build(delete)).get(); 795 assertTrue(result.isSuccess()); 796 assertNull(result.getResult()); 797 } 798 799 @Test 800 public void testCheckAndMutateWithSingleFilter() throws Throwable { 801 AsyncTable<?> table = getTable.get(); 802 803 // Put one row 804 Put put = new Put(row); 805 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 806 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 807 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 808 table.put(put).get(); 809 810 // Put with success 811 CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 812 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 813 Bytes.toBytes("a"))) 814 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get(); 815 assertTrue(result.isSuccess()); 816 assertNull(result.getResult()); 817 818 Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 819 assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D")))); 820 821 // Put with failure 822 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 823 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 824 Bytes.toBytes("b"))) 825 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get(); 826 assertFalse(result.isSuccess()); 827 assertNull(result.getResult()); 828 829 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); 830 831 // Delete with success 832 result = 833 table 834 .checkAndMutate(CheckAndMutate.newBuilder(row) 835 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 836 Bytes.toBytes("a"))) 837 .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))) 838 .get(); 839 assertTrue(result.isSuccess()); 840 assertNull(result.getResult()); 841 842 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); 843 844 // Mutate with success 845 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 846 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 847 Bytes.toBytes("b"))) 848 .build(new RowMutations(row) 849 .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 850 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))) 851 .get(); 852 assertTrue(result.isSuccess()); 853 assertNull(result.getResult()); 854 855 r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 856 assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D")))); 857 858 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 859 } 860 861 @Test 862 public void testCheckAndMutateWithMultipleFilters() throws Throwable { 863 AsyncTable<?> table = getTable.get(); 864 865 // Put one row 866 Put put = new Put(row); 867 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 868 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 869 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 870 table.put(put).get(); 871 872 // Put with success 873 CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 874 .ifMatches(new FilterList( 875 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 876 Bytes.toBytes("a")), 877 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 878 Bytes.toBytes("b")))) 879 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get(); 880 assertTrue(result.isSuccess()); 881 assertNull(result.getResult()); 882 883 Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 884 assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D")))); 885 886 // Put with failure 887 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 888 .ifMatches(new FilterList( 889 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 890 Bytes.toBytes("a")), 891 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 892 Bytes.toBytes("c")))) 893 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get(); 894 assertFalse(result.isSuccess()); 895 assertNull(result.getResult()); 896 897 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); 898 899 // Delete with success 900 result = table 901 .checkAndMutate(CheckAndMutate.newBuilder(row) 902 .ifMatches(new FilterList( 903 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 904 Bytes.toBytes("a")), 905 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 906 Bytes.toBytes("b")))) 907 .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))) 908 .get(); 909 assertTrue(result.isSuccess()); 910 assertNull(result.getResult()); 911 912 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); 913 914 // Mutate with success 915 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 916 .ifMatches(new FilterList( 917 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 918 Bytes.toBytes("a")), 919 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 920 Bytes.toBytes("b")))) 921 .build(new RowMutations(row) 922 .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 923 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))) 924 .get(); 925 assertTrue(result.isSuccess()); 926 assertNull(result.getResult()); 927 928 r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 929 assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D")))); 930 931 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 932 } 933 934 @Test 935 public void testCheckAndMutateWithTimestampFilter() throws Throwable { 936 AsyncTable<?> table = getTable.get(); 937 938 // Put with specifying the timestamp 939 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get(); 940 941 // Put with success 942 CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 943 .ifMatches( 944 new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), 945 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), 946 new TimestampsFilter(Collections.singletonList(100L)))) 947 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get(); 948 assertTrue(result.isSuccess()); 949 assertNull(result.getResult()); 950 951 Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 952 assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B")))); 953 954 // Put with failure 955 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 956 .ifMatches( 957 new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), 958 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), 959 new TimestampsFilter(Collections.singletonList(101L)))) 960 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get(); 961 assertFalse(result.isSuccess()); 962 assertNull(result.getResult()); 963 964 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 965 } 966 967 @Test 968 public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable { 969 AsyncTable<?> table = getTable.get(); 970 971 // Put with specifying the timestamp 972 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get(); 973 974 // Put with success 975 CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 976 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 977 Bytes.toBytes("a"))) 978 .timeRange(TimeRange.between(0, 101)) 979 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get(); 980 assertTrue(result.isSuccess()); 981 assertNull(result.getResult()); 982 983 Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 984 assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B")))); 985 986 // Put with failure 987 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 988 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 989 Bytes.toBytes("a"))) 990 .timeRange(TimeRange.between(0, 100)) 991 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get(); 992 assertFalse(result.isSuccess()); 993 assertNull(result.getResult()); 994 995 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 996 } 997 998 @Test 999 public void testCheckAndIncrement() throws Throwable { 1000 AsyncTable<?> table = getTable.get(); 1001 1002 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get(); 1003 1004 // CheckAndIncrement with correct value 1005 CheckAndMutateResult res = table.checkAndMutate( 1006 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1007 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))) 1008 .get(); 1009 assertTrue(res.isSuccess()); 1010 assertEquals(1, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1011 1012 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1013 assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1014 1015 // CheckAndIncrement with wrong value 1016 res = table.checkAndMutate( 1017 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b")) 1018 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))) 1019 .get(); 1020 assertFalse(res.isSuccess()); 1021 assertNull(res.getResult()); 1022 1023 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1024 assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1025 1026 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))); 1027 1028 // CheckAndIncrement with a filter and correct value 1029 res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1030 .ifMatches(new FilterList( 1031 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1032 Bytes.toBytes("a")), 1033 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL, 1034 Bytes.toBytes("c")))) 1035 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get(); 1036 assertTrue(res.isSuccess()); 1037 assertEquals(3, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1038 1039 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1040 assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1041 1042 // CheckAndIncrement with a filter and correct value 1043 res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1044 .ifMatches(new FilterList( 1045 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1046 Bytes.toBytes("b")), 1047 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL, 1048 Bytes.toBytes("d")))) 1049 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get(); 1050 assertFalse(res.isSuccess()); 1051 assertNull(res.getResult()); 1052 1053 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1054 assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1055 } 1056 1057 @Test 1058 public void testCheckAndAppend() throws Throwable { 1059 AsyncTable<?> table = getTable.get(); 1060 1061 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get(); 1062 1063 // CheckAndAppend with correct value 1064 CheckAndMutateResult res = 1065 table 1066 .checkAndMutate( 1067 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1068 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))) 1069 .get(); 1070 assertTrue(res.isSuccess()); 1071 assertEquals("b", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1072 1073 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1074 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1075 1076 // CheckAndAppend with correct value 1077 res = 1078 table 1079 .checkAndMutate( 1080 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b")) 1081 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))) 1082 .get(); 1083 assertFalse(res.isSuccess()); 1084 assertNull(res.getResult()); 1085 1086 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1087 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1088 1089 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))); 1090 1091 // CheckAndAppend with a filter and correct value 1092 res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1093 .ifMatches(new FilterList( 1094 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1095 Bytes.toBytes("a")), 1096 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL, 1097 Bytes.toBytes("c")))) 1098 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get(); 1099 assertTrue(res.isSuccess()); 1100 assertEquals("bbb", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1101 1102 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1103 assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1104 1105 // CheckAndAppend with a filter and wrong value 1106 res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1107 .ifMatches(new FilterList( 1108 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1109 Bytes.toBytes("b")), 1110 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL, 1111 Bytes.toBytes("d")))) 1112 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get(); 1113 assertFalse(res.isSuccess()); 1114 assertNull(res.getResult()); 1115 1116 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1117 assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1118 } 1119 1120 @Test 1121 public void testCheckAndRowMutations() throws Throwable { 1122 final byte[] q1 = Bytes.toBytes("q1"); 1123 final byte[] q2 = Bytes.toBytes("q2"); 1124 final byte[] q3 = Bytes.toBytes("q3"); 1125 final byte[] q4 = Bytes.toBytes("q4"); 1126 final String v1 = "v1"; 1127 1128 AsyncTable<?> table = getTable.get(); 1129 1130 // Initial values 1131 table.putAll(Arrays.asList(new Put(row).addColumn(FAMILY, q2, Bytes.toBytes("toBeDeleted")), 1132 new Put(row).addColumn(FAMILY, q3, Bytes.toBytes(5L)), 1133 new Put(row).addColumn(FAMILY, q4, Bytes.toBytes("a")))).get(); 1134 1135 // Do CheckAndRowMutations 1136 CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, q1).build( 1137 new RowMutations(row).add(Arrays.asList(new Put(row).addColumn(FAMILY, q1, Bytes.toBytes(v1)), 1138 new Delete(row).addColumns(FAMILY, q2), new Increment(row).addColumn(FAMILY, q3, 1), 1139 new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b"))))); 1140 1141 CheckAndMutateResult result = table.checkAndMutate(checkAndMutate).get(); 1142 assertTrue(result.isSuccess()); 1143 assertEquals(6L, Bytes.toLong(result.getResult().getValue(FAMILY, q3))); 1144 assertEquals("ab", Bytes.toString(result.getResult().getValue(FAMILY, q4))); 1145 1146 // Verify the value 1147 Result r = table.get(new Get(row)).get(); 1148 assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1))); 1149 assertNull(r.getValue(FAMILY, q2)); 1150 assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3))); 1151 assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4))); 1152 1153 // Do CheckAndRowMutations again 1154 checkAndMutate = CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, q1) 1155 .build(new RowMutations(row).add(Arrays.asList(new Delete(row).addColumns(FAMILY, q1), 1156 new Put(row).addColumn(FAMILY, q2, Bytes.toBytes(v1)), 1157 new Increment(row).addColumn(FAMILY, q3, 1), 1158 new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b"))))); 1159 1160 result = table.checkAndMutate(checkAndMutate).get(); 1161 assertFalse(result.isSuccess()); 1162 assertNull(result.getResult()); 1163 1164 // Verify the value 1165 r = table.get(new Get(row)).get(); 1166 assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1))); 1167 assertNull(r.getValue(FAMILY, q2)); 1168 assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3))); 1169 assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4))); 1170 } 1171 1172 // Tests for batch version of checkAndMutate 1173 1174 @Test 1175 public void testCheckAndMutateBatch() throws Throwable { 1176 AsyncTable<?> table = getTable.get(); 1177 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1178 byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3"); 1179 byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4"); 1180 1181 table 1182 .putAll(Arrays.asList(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 1183 new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), 1184 new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 1185 new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))) 1186 .get(); 1187 1188 // Test for Put 1189 CheckAndMutate checkAndMutate1 = 1190 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1191 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))); 1192 1193 CheckAndMutate checkAndMutate2 = 1194 CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a")) 1195 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); 1196 1197 List<CheckAndMutateResult> results = 1198 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1199 1200 assertTrue(results.get(0).isSuccess()); 1201 assertNull(results.get(0).getResult()); 1202 assertFalse(results.get(1).isSuccess()); 1203 assertNull(results.get(1).getResult()); 1204 1205 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get(); 1206 assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); 1207 1208 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1209 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1210 1211 // Test for Delete 1212 checkAndMutate1 = CheckAndMutate.newBuilder(row) 1213 .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")).build(new Delete(row)); 1214 1215 checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1216 .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a")).build(new Delete(row2)); 1217 1218 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1219 1220 assertTrue(results.get(0).isSuccess()); 1221 assertNull(results.get(0).getResult()); 1222 assertFalse(results.get(1).isSuccess()); 1223 assertNull(results.get(1).getResult()); 1224 1225 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 1226 1227 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1228 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1229 1230 // Test for RowMutations 1231 checkAndMutate1 = 1232 CheckAndMutate.newBuilder(row3).ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")) 1233 .build(new RowMutations(row3) 1234 .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) 1235 .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))); 1236 1237 checkAndMutate2 = 1238 CheckAndMutate.newBuilder(row4).ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f")) 1239 .build(new RowMutations(row4) 1240 .add((Mutation) new Put(row4).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) 1241 .add((Mutation) new Delete(row4).addColumns(FAMILY, Bytes.toBytes("D")))); 1242 1243 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1244 1245 assertTrue(results.get(0).isSuccess()); 1246 assertNull(results.get(0).getResult()); 1247 assertFalse(results.get(1).isSuccess()); 1248 assertNull(results.get(1).getResult()); 1249 1250 result = table.get(new Get(row3)).get(); 1251 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1252 assertNull(result.getValue(FAMILY, Bytes.toBytes("D"))); 1253 1254 result = table.get(new Get(row4)).get(); 1255 assertNull(result.getValue(FAMILY, Bytes.toBytes("F"))); 1256 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1257 } 1258 1259 @Test 1260 public void testCheckAndMutateBatch2() throws Throwable { 1261 AsyncTable<?> table = getTable.get(); 1262 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1263 byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3"); 1264 byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4"); 1265 1266 table 1267 .putAll(Arrays.asList(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 1268 new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), 1269 new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), 100, Bytes.toBytes("c")), 1270 new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")))) 1271 .get(); 1272 1273 // Test for ifNotExists() 1274 CheckAndMutate checkAndMutate1 = 1275 CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, Bytes.toBytes("B")) 1276 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))); 1277 1278 CheckAndMutate checkAndMutate2 = 1279 CheckAndMutate.newBuilder(row2).ifNotExists(FAMILY, Bytes.toBytes("B")) 1280 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); 1281 1282 List<CheckAndMutateResult> results = 1283 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1284 1285 assertTrue(results.get(0).isSuccess()); 1286 assertNull(results.get(0).getResult()); 1287 assertFalse(results.get(1).isSuccess()); 1288 assertNull(results.get(1).getResult()); 1289 1290 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get(); 1291 assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); 1292 1293 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1294 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1295 1296 // Test for ifMatches() 1297 checkAndMutate1 = CheckAndMutate.newBuilder(row) 1298 .ifMatches(FAMILY, Bytes.toBytes("A"), CompareOperator.NOT_EQUAL, Bytes.toBytes("a")) 1299 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))); 1300 1301 checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1302 .ifMatches(FAMILY, Bytes.toBytes("B"), CompareOperator.GREATER, Bytes.toBytes("b")) 1303 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); 1304 1305 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1306 1307 assertTrue(results.get(0).isSuccess()); 1308 assertNull(results.get(0).getResult()); 1309 assertFalse(results.get(1).isSuccess()); 1310 assertNull(results.get(1).getResult()); 1311 1312 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get(); 1313 assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); 1314 1315 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1316 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1317 1318 // Test for timeRange() 1319 checkAndMutate1 = CheckAndMutate.newBuilder(row3) 1320 .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")).timeRange(TimeRange.between(0, 101)) 1321 .build(new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("e"))); 1322 1323 checkAndMutate2 = CheckAndMutate.newBuilder(row4) 1324 .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")).timeRange(TimeRange.between(0, 100)) 1325 .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))); 1326 1327 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1328 1329 assertTrue(results.get(0).isSuccess()); 1330 assertNull(results.get(0).getResult()); 1331 assertFalse(results.get(1).isSuccess()); 1332 assertNull(results.get(1).getResult()); 1333 1334 result = table.get(new Get(row3).addColumn(FAMILY, Bytes.toBytes("C"))).get(); 1335 assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 1336 1337 result = table.get(new Get(row4).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 1338 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1339 } 1340 1341 @Test 1342 public void testCheckAndMutateBatchWithFilter() throws Throwable { 1343 AsyncTable<?> table = getTable.get(); 1344 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1345 1346 table.putAll(Arrays.asList( 1347 new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1348 .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) 1349 .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 1350 new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")) 1351 .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")) 1352 .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))) 1353 .get(); 1354 1355 // Test for Put 1356 CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row) 1357 .ifMatches(new FilterList( 1358 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1359 Bytes.toBytes("a")), 1360 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 1361 Bytes.toBytes("b")))) 1362 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g"))); 1363 1364 CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1365 .ifMatches(new FilterList( 1366 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, 1367 Bytes.toBytes("a")), 1368 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, 1369 Bytes.toBytes("b")))) 1370 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h"))); 1371 1372 List<CheckAndMutateResult> results = 1373 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1374 1375 assertTrue(results.get(0).isSuccess()); 1376 assertNull(results.get(0).getResult()); 1377 assertFalse(results.get(1).isSuccess()); 1378 assertNull(results.get(1).getResult()); 1379 1380 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get(); 1381 assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 1382 1383 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get(); 1384 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1385 1386 // Test for Delete 1387 checkAndMutate1 = CheckAndMutate.newBuilder(row) 1388 .ifMatches(new FilterList( 1389 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1390 Bytes.toBytes("a")), 1391 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 1392 Bytes.toBytes("b")))) 1393 .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("C"))); 1394 1395 checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1396 .ifMatches(new FilterList( 1397 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, 1398 Bytes.toBytes("a")), 1399 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, 1400 Bytes.toBytes("b")))) 1401 .build(new Delete(row2).addColumn(FAMILY, Bytes.toBytes("F"))); 1402 1403 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1404 1405 assertTrue(results.get(0).isSuccess()); 1406 assertNull(results.get(0).getResult()); 1407 assertFalse(results.get(1).isSuccess()); 1408 assertNull(results.get(1).getResult()); 1409 1410 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 1411 1412 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get(); 1413 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1414 1415 // Test for RowMutations 1416 checkAndMutate1 = CheckAndMutate.newBuilder(row) 1417 .ifMatches(new FilterList( 1418 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1419 Bytes.toBytes("a")), 1420 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 1421 Bytes.toBytes("b")))) 1422 .build(new RowMutations(row) 1423 .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))) 1424 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))); 1425 1426 checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1427 .ifMatches(new FilterList( 1428 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, 1429 Bytes.toBytes("a")), 1430 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, 1431 Bytes.toBytes("b")))) 1432 .build(new RowMutations(row2) 1433 .add((Mutation) new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g"))) 1434 .add((Mutation) new Delete(row2).addColumns(FAMILY, Bytes.toBytes("D")))); 1435 1436 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1437 1438 assertTrue(results.get(0).isSuccess()); 1439 assertNull(results.get(0).getResult()); 1440 assertFalse(results.get(1).isSuccess()); 1441 assertNull(results.get(1).getResult()); 1442 1443 result = table.get(new Get(row)).get(); 1444 assertNull(result.getValue(FAMILY, Bytes.toBytes("A"))); 1445 assertEquals("c", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 1446 1447 result = table.get(new Get(row2)).get(); 1448 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1449 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1450 } 1451 1452 @Test 1453 public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable { 1454 AsyncTable<?> table = getTable.get(); 1455 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1456 1457 table.putAll(Arrays.asList( 1458 new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")) 1459 .addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b")) 1460 .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 1461 new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")) 1462 .addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e")) 1463 .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))) 1464 .get(); 1465 1466 CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row) 1467 .ifMatches(new FilterList( 1468 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1469 Bytes.toBytes("a")), 1470 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 1471 Bytes.toBytes("b")))) 1472 .timeRange(TimeRange.between(0, 101)) 1473 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g"))); 1474 1475 CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1476 .ifMatches(new FilterList( 1477 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, 1478 Bytes.toBytes("d")), 1479 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, 1480 Bytes.toBytes("e")))) 1481 .timeRange(TimeRange.between(0, 100)) 1482 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h"))); 1483 1484 List<CheckAndMutateResult> results = 1485 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1486 1487 assertTrue(results.get(0).isSuccess()); 1488 assertNull(results.get(0).getResult()); 1489 assertFalse(results.get(1).isSuccess()); 1490 assertNull(results.get(1).getResult()); 1491 1492 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get(); 1493 assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 1494 1495 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get(); 1496 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1497 } 1498 1499 @Test 1500 public void testCheckAndIncrementBatch() throws Throwable { 1501 AsyncTable<?> table = getTable.get(); 1502 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1503 1504 table.putAll(Arrays.asList( 1505 new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")).addColumn(FAMILY, 1506 Bytes.toBytes("B"), Bytes.toBytes(0L)), 1507 new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")).addColumn(FAMILY, 1508 Bytes.toBytes("D"), Bytes.toBytes(0L)))) 1509 .get(); 1510 1511 // CheckAndIncrement with correct value 1512 CheckAndMutate checkAndMutate1 = 1513 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1514 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1)); 1515 1516 // CheckAndIncrement with wrong value 1517 CheckAndMutate checkAndMutate2 = 1518 CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d")) 1519 .build(new Increment(row2).addColumn(FAMILY, Bytes.toBytes("D"), 1)); 1520 1521 List<CheckAndMutateResult> results = 1522 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1523 1524 assertTrue(results.get(0).isSuccess()); 1525 assertEquals(1, Bytes.toLong(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1526 assertFalse(results.get(1).isSuccess()); 1527 assertNull(results.get(1).getResult()); 1528 1529 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1530 assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1531 1532 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 1533 assertEquals(0, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("D")))); 1534 } 1535 1536 @Test 1537 public void testCheckAndAppendBatch() throws Throwable { 1538 AsyncTable<?> table = getTable.get(); 1539 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1540 1541 table.putAll(Arrays.asList( 1542 new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")).addColumn(FAMILY, 1543 Bytes.toBytes("B"), Bytes.toBytes("b")), 1544 new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")).addColumn(FAMILY, 1545 Bytes.toBytes("D"), Bytes.toBytes("d")))) 1546 .get(); 1547 1548 // CheckAndAppend with correct value 1549 CheckAndMutate checkAndMutate1 = 1550 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1551 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))); 1552 1553 // CheckAndAppend with wrong value 1554 CheckAndMutate checkAndMutate2 = 1555 CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d")) 1556 .build(new Append(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); 1557 1558 List<CheckAndMutateResult> results = 1559 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1560 1561 assertTrue(results.get(0).isSuccess()); 1562 assertEquals("bb", 1563 Bytes.toString(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1564 assertFalse(results.get(1).isSuccess()); 1565 assertNull(results.get(1).getResult()); 1566 1567 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1568 assertEquals("bb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1569 1570 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 1571 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1572 } 1573 1574 @Test 1575 public void testCheckAndRowMutationsBatch() throws Throwable { 1576 AsyncTable<?> table = getTable.get(); 1577 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1578 1579 table.putAll(Arrays.asList( 1580 new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) 1581 .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes(1L)) 1582 .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")), 1583 new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")) 1584 .addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes(1L)) 1585 .addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h")))) 1586 .get(); 1587 1588 // CheckAndIncrement with correct value 1589 CheckAndMutate checkAndMutate1 = 1590 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) 1591 .build(new RowMutations(row) 1592 .add(Arrays.asList(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 1593 new Delete(row).addColumns(FAMILY, Bytes.toBytes("B")), 1594 new Increment(row).addColumn(FAMILY, Bytes.toBytes("C"), 1L), 1595 new Append(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))))); 1596 1597 // CheckAndIncrement with wrong value 1598 CheckAndMutate checkAndMutate2 = 1599 CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("a")) 1600 .build(new RowMutations(row2).add( 1601 Arrays.asList(new Put(row2).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")), 1602 new Delete(row2).addColumns(FAMILY, Bytes.toBytes("F")), 1603 new Increment(row2).addColumn(FAMILY, Bytes.toBytes("G"), 1L), 1604 new Append(row2).addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h"))))); 1605 1606 List<CheckAndMutateResult> results = 1607 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1608 1609 assertTrue(results.get(0).isSuccess()); 1610 assertEquals(2, Bytes.toLong(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("C")))); 1611 assertEquals("dd", 1612 Bytes.toString(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("D")))); 1613 1614 assertFalse(results.get(1).isSuccess()); 1615 assertNull(results.get(1).getResult()); 1616 1617 Result result = table.get(new Get(row)).get(); 1618 assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); 1619 assertNull(result.getValue(FAMILY, Bytes.toBytes("B"))); 1620 assertEquals(2, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C")))); 1621 assertEquals("dd", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1622 1623 result = table.get(new Get(row2)).get(); 1624 assertNull(result.getValue(FAMILY, Bytes.toBytes("E"))); 1625 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1626 assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("G")))); 1627 assertEquals("h", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("H")))); 1628 } 1629 1630 @Test 1631 public void testDisabled() throws InterruptedException, ExecutionException { 1632 ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get(); 1633 try { 1634 getTable.get().get(new Get(row)).get(); 1635 fail("Should fail since table has been disabled"); 1636 } catch (ExecutionException e) { 1637 Throwable cause = e.getCause(); 1638 assertThat(cause, instanceOf(TableNotEnabledException.class)); 1639 assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString())); 1640 } 1641 } 1642 1643 @Test 1644 public void testInvalidPut() { 1645 try { 1646 getTable.get().put(new Put(Bytes.toBytes(0))); 1647 fail("Should fail since the put does not contain any cells"); 1648 } catch (IllegalArgumentException e) { 1649 assertThat(e.getMessage(), containsString("No columns to insert")); 1650 } 1651 1652 try { 1653 getTable.get() 1654 .put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE])); 1655 fail("Should fail since the put exceeds the max key value size"); 1656 } catch (IllegalArgumentException e) { 1657 assertThat(e.getMessage(), containsString("KeyValue size too large")); 1658 } 1659 } 1660 1661 @Test 1662 public void testInvalidPutInRowMutations() throws IOException { 1663 final byte[] row = Bytes.toBytes(0); 1664 try { 1665 getTable.get().mutateRow(new RowMutations(row).add(new Put(row))); 1666 fail("Should fail since the put does not contain any cells"); 1667 } catch (IllegalArgumentException e) { 1668 assertThat(e.getMessage(), containsString("No columns to insert")); 1669 } 1670 1671 try { 1672 getTable.get().mutateRow(new RowMutations(row) 1673 .add(new Put(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]))); 1674 fail("Should fail since the put exceeds the max key value size"); 1675 } catch (IllegalArgumentException e) { 1676 assertThat(e.getMessage(), containsString("KeyValue size too large")); 1677 } 1678 } 1679 1680 @Test 1681 public void testInvalidPutInRowMutationsInCheckAndMutate() throws IOException { 1682 final byte[] row = Bytes.toBytes(0); 1683 try { 1684 getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER) 1685 .build(new RowMutations(row).add(new Put(row)))); 1686 fail("Should fail since the put does not contain any cells"); 1687 } catch (IllegalArgumentException e) { 1688 assertThat(e.getMessage(), containsString("No columns to insert")); 1689 } 1690 1691 try { 1692 getTable.get().checkAndMutate( 1693 CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER).build(new RowMutations(row) 1694 .add(new Put(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE])))); 1695 fail("Should fail since the put exceeds the max key value size"); 1696 } catch (IllegalArgumentException e) { 1697 assertThat(e.getMessage(), containsString("KeyValue size too large")); 1698 } 1699 } 1700}