001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertArrayEquals; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.List; 033import java.util.concurrent.CompletableFuture; 034import java.util.concurrent.ExecutionException; 035import java.util.concurrent.TimeUnit; 036import java.util.stream.Collectors; 037import java.util.stream.IntStream; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.testclassification.ClientTests; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.junit.AfterClass; 045import org.junit.BeforeClass; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049 050import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 051import org.apache.hbase.thirdparty.io.netty.util.Timeout; 052 053@Category({ MediumTests.class, ClientTests.class }) 054public class TestAsyncBufferMutator { 055 056 @ClassRule 057 public static final HBaseClassTestRule CLASS_RULE = 058 HBaseClassTestRule.forClass(TestAsyncBufferMutator.class); 059 060 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 061 062 private static TableName TABLE_NAME = TableName.valueOf("async"); 063 064 private static TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("async-multi-region"); 065 066 private static byte[] CF = Bytes.toBytes("cf"); 067 068 private static byte[] CQ = Bytes.toBytes("cq"); 069 070 private static int COUNT = 100; 071 072 private static byte[] VALUE = new byte[1024]; 073 074 private static AsyncConnection CONN; 075 076 @BeforeClass 077 public static void setUp() throws Exception { 078 TEST_UTIL.startMiniCluster(1); 079 TEST_UTIL.createTable(TABLE_NAME, CF); 080 TEST_UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, CF); 081 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 082 Bytes.random(VALUE); 083 } 084 085 @AfterClass 086 public static void tearDown() throws Exception { 087 CONN.close(); 088 TEST_UTIL.shutdownMiniCluster(); 089 } 090 091 @Test 092 public void testWithMultiRegionTable() throws InterruptedException { 093 test(MULTI_REGION_TABLE_NAME); 094 } 095 096 @Test 097 public void testWithSingleRegionTable() throws InterruptedException { 098 test(TABLE_NAME); 099 } 100 101 private void test(TableName tableName) throws InterruptedException { 102 List<CompletableFuture<Void>> futures = new ArrayList<>(); 103 try (AsyncBufferedMutator mutator = 104 CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) { 105 List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2) 106 .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)) 107 .collect(Collectors.toList())); 108 // exceeded the write buffer size, a flush will be called directly 109 fs.forEach(f -> f.join()); 110 IntStream.range(COUNT / 2, COUNT).forEach(i -> { 111 futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))); 112 }); 113 // the first future should have been sent out. 114 futures.get(0).join(); 115 Thread.sleep(2000); 116 // the last one should still be in write buffer 117 assertFalse(futures.get(futures.size() - 1).isDone()); 118 } 119 // mutator.close will call mutator.flush automatically so all tasks should have been done. 120 futures.forEach(f -> f.join()); 121 AsyncTable<?> table = CONN.getTable(tableName); 122 IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join()) 123 .forEach(r -> { 124 assertArrayEquals(VALUE, r.getValue(CF, CQ)); 125 }); 126 } 127 128 @Test 129 public void testClosedMutate() throws InterruptedException { 130 AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME); 131 mutator.close(); 132 Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); 133 try { 134 mutator.mutate(put).get(); 135 fail("Close check failed"); 136 } catch (ExecutionException e) { 137 assertThat(e.getCause(), instanceOf(IOException.class)); 138 assertTrue(e.getCause().getMessage().startsWith("Already closed")); 139 } 140 for (CompletableFuture<Void> f : mutator.mutate(Arrays.asList(put))) { 141 try { 142 f.get(); 143 fail("Close check failed"); 144 } catch (ExecutionException e) { 145 assertThat(e.getCause(), instanceOf(IOException.class)); 146 assertTrue(e.getCause().getMessage().startsWith("Already closed")); 147 } 148 } 149 } 150 151 @Test 152 public void testNoPeriodicFlush() throws InterruptedException, ExecutionException { 153 try (AsyncBufferedMutator mutator = 154 CONN.getBufferedMutatorBuilder(TABLE_NAME).disableWriteBufferPeriodicFlush().build()) { 155 Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); 156 CompletableFuture<?> future = mutator.mutate(put); 157 Thread.sleep(2000); 158 // assert that we have not flushed it out 159 assertFalse(future.isDone()); 160 mutator.flush(); 161 future.get(); 162 } 163 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 164 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); 165 } 166 167 @Test 168 public void testPeriodicFlush() throws InterruptedException, ExecutionException { 169 AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME) 170 .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build(); 171 Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); 172 CompletableFuture<?> future = mutator.mutate(put); 173 future.get(); 174 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 175 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); 176 } 177 178 // a bit deep into the implementation 179 @Test 180 public void testCancelPeriodicFlush() throws InterruptedException, ExecutionException { 181 Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); 182 try (AsyncBufferedMutatorImpl mutator = (AsyncBufferedMutatorImpl) CONN 183 .getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS) 184 .setWriteBufferSize(10 * put.heapSize()).build()) { 185 List<CompletableFuture<?>> futures = new ArrayList<>(); 186 futures.add(mutator.mutate(put)); 187 Timeout task = mutator.periodicFlushTask; 188 // we should have scheduled a periodic flush task 189 assertNotNull(task); 190 for (int i = 1;; i++) { 191 futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))); 192 if (mutator.periodicFlushTask == null) { 193 break; 194 } 195 } 196 assertTrue(task.isCancelled()); 197 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); 198 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 199 for (int i = 0; i < futures.size(); i++) { 200 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(i))).get().getValue(CF, CQ)); 201 } 202 } 203 } 204 205 @Test 206 public void testCancelPeriodicFlushByManuallyFlush() 207 throws InterruptedException, ExecutionException { 208 try (AsyncBufferedMutatorImpl mutator = 209 (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME) 210 .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) { 211 CompletableFuture<?> future = 212 mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE)); 213 Timeout task = mutator.periodicFlushTask; 214 // we should have scheduled a periodic flush task 215 assertNotNull(task); 216 mutator.flush(); 217 assertTrue(task.isCancelled()); 218 future.get(); 219 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 220 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); 221 } 222 } 223 224 @Test 225 public void testCancelPeriodicFlushByClose() throws InterruptedException, ExecutionException { 226 CompletableFuture<?> future; 227 Timeout task; 228 try (AsyncBufferedMutatorImpl mutator = 229 (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME) 230 .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) { 231 future = mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE)); 232 task = mutator.periodicFlushTask; 233 // we should have scheduled a periodic flush task 234 assertNotNull(task); 235 } 236 assertTrue(task.isCancelled()); 237 future.get(); 238 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 239 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); 240 } 241 242 private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutatorImpl { 243 244 private int flushCount; 245 246 AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table, 247 long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) { 248 super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs, maxKeyValueSize); 249 } 250 251 @Override 252 protected void internalFlush() { 253 flushCount++; 254 super.internalFlush(); 255 } 256 } 257 258 @Test 259 public void testRaceBetweenNormalFlushAndPeriodicFlush() 260 throws InterruptedException, ExecutionException { 261 Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); 262 try (AsyncBufferMutatorForTest mutator = 263 new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME), 264 10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024)) { 265 CompletableFuture<?> future = mutator.mutate(put); 266 Timeout task = mutator.periodicFlushTask; 267 // we should have scheduled a periodic flush task 268 assertNotNull(task); 269 synchronized (mutator) { 270 // synchronized on mutator to prevent periodic flush to be executed 271 Thread.sleep(500); 272 // the timeout should be issued 273 assertTrue(task.isExpired()); 274 // but no flush is issued as we hold the lock 275 assertEquals(0, mutator.flushCount); 276 assertFalse(future.isDone()); 277 // manually flush, then release the lock 278 mutator.flush(); 279 } 280 // this is a bit deep into the implementation in netty but anyway let's add a check here to 281 // confirm that an issued timeout can not be canceled by netty framework. 282 assertFalse(task.isCancelled()); 283 // and the mutation is done 284 future.get(); 285 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 286 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); 287 // only the manual flush, the periodic flush should have been canceled by us 288 assertEquals(1, mutator.flushCount); 289 } 290 } 291}