001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.Matchers.allOf; 023import static org.hamcrest.Matchers.anyOf; 024import static org.hamcrest.Matchers.containsString; 025import static org.hamcrest.Matchers.endsWith; 026import static org.hamcrest.Matchers.hasItem; 027import static org.hamcrest.Matchers.hasProperty; 028import static org.hamcrest.Matchers.is; 029import static org.hamcrest.Matchers.isA; 030import static org.junit.Assert.assertEquals; 031import static org.junit.Assert.assertThrows; 032import static org.junit.Assert.fail; 033 034import io.opentelemetry.sdk.trace.data.SpanData; 035import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; 036import java.io.IOException; 037import java.io.UncheckedIOException; 038import java.util.Arrays; 039import java.util.List; 040import java.util.Objects; 041import java.util.concurrent.ExecutionException; 042import java.util.concurrent.ForkJoinPool; 043import java.util.concurrent.TimeUnit; 044import java.util.function.Supplier; 045import java.util.stream.Collectors; 046import java.util.stream.IntStream; 047import java.util.stream.Stream; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.hbase.ConnectionRule; 050import org.apache.hadoop.hbase.HBaseTestingUtility; 051import org.apache.hadoop.hbase.MatcherPredicate; 052import org.apache.hadoop.hbase.MiniClusterRule; 053import org.apache.hadoop.hbase.StartMiniClusterOption; 054import org.apache.hadoop.hbase.TableName; 055import org.apache.hadoop.hbase.Waiter; 056import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException; 057import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 058import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule; 059import org.apache.hadoop.hbase.trace.OpenTelemetryTestRule; 060import org.apache.hadoop.hbase.trace.TraceUtil; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.JVMClusterUtil; 063import org.apache.hadoop.hbase.util.Pair; 064import org.hamcrest.Matcher; 065import org.junit.ClassRule; 066import org.junit.Rule; 067import org.junit.Test; 068import org.junit.rules.ExternalResource; 069import org.junit.rules.RuleChain; 070import org.junit.rules.TestName; 071import org.junit.rules.TestRule; 072 073public abstract class AbstractTestAsyncTableScan { 074 075 protected static final OpenTelemetryClassRule OTEL_CLASS_RULE = OpenTelemetryClassRule.create(); 076 protected static final MiniClusterRule MINI_CLUSTER_RULE = MiniClusterRule.newBuilder() 077 .setMiniClusterOption(StartMiniClusterOption.builder().numWorkers(3).build()).build(); 078 079 protected static final ConnectionRule CONN_RULE = 080 ConnectionRule.createAsyncConnectionRule(MINI_CLUSTER_RULE::createAsyncConnection); 081 082 private static final class Setup extends ExternalResource { 083 @Override 084 protected void before() throws Throwable { 085 final HBaseTestingUtility testingUtil = MINI_CLUSTER_RULE.getTestingUtility(); 086 final AsyncConnection conn = CONN_RULE.getAsyncConnection(); 087 088 byte[][] splitKeys = new byte[8][]; 089 for (int i = 111; i < 999; i += 111) { 090 splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); 091 } 092 testingUtil.createTable(TABLE_NAME, FAMILY, splitKeys); 093 testingUtil.waitTableAvailable(TABLE_NAME); 094 conn.getTable(TABLE_NAME) 095 .putAll(IntStream.range(0, COUNT) 096 .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) 097 .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))) 098 .collect(Collectors.toList())) 099 .get(); 100 } 101 } 102 103 @ClassRule 104 public static final TestRule classRule = RuleChain.outerRule(OTEL_CLASS_RULE) 105 .around(MINI_CLUSTER_RULE).around(CONN_RULE).around(new Setup()); 106 107 @Rule 108 public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(OTEL_CLASS_RULE); 109 110 @Rule 111 public final TestName testName = new TestName(); 112 113 protected static TableName TABLE_NAME = TableName.valueOf("async"); 114 115 protected static byte[] FAMILY = Bytes.toBytes("cf"); 116 117 protected static byte[] CQ1 = Bytes.toBytes("cq1"); 118 119 protected static byte[] CQ2 = Bytes.toBytes("cq2"); 120 121 protected static int COUNT = 1000; 122 123 private static Scan createNormalScan() { 124 return new Scan(); 125 } 126 127 private static Scan createBatchScan() { 128 return new Scan().setBatch(1); 129 } 130 131 // set a small result size for testing flow control 132 private static Scan createSmallResultSizeScan() { 133 return new Scan().setMaxResultSize(1); 134 } 135 136 private static Scan createBatchSmallResultSizeScan() { 137 return new Scan().setBatch(1).setMaxResultSize(1); 138 } 139 140 private static AsyncTable<?> getRawTable() { 141 return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME); 142 } 143 144 private static AsyncTable<?> getTable() { 145 return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool()); 146 } 147 148 private static List<Pair<String, Supplier<Scan>>> getScanCreator() { 149 return Arrays.asList(Pair.newPair("normal", AbstractTestAsyncTableScan::createNormalScan), 150 Pair.newPair("batch", AbstractTestAsyncTableScan::createBatchScan), 151 Pair.newPair("smallResultSize", AbstractTestAsyncTableScan::createSmallResultSizeScan), 152 Pair.newPair("batchSmallResultSize", 153 AbstractTestAsyncTableScan::createBatchSmallResultSizeScan)); 154 } 155 156 protected static List<Object[]> getScanCreatorParams() { 157 return getScanCreator().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() }) 158 .collect(Collectors.toList()); 159 } 160 161 private static List<Pair<String, Supplier<AsyncTable<?>>>> getTableCreator() { 162 return Arrays.asList(Pair.newPair("raw", AbstractTestAsyncTableScan::getRawTable), 163 Pair.newPair("normal", AbstractTestAsyncTableScan::getTable)); 164 } 165 166 protected static List<Object[]> getTableAndScanCreatorParams() { 167 List<Pair<String, Supplier<AsyncTable<?>>>> tableCreator = getTableCreator(); 168 List<Pair<String, Supplier<Scan>>> scanCreator = getScanCreator(); 169 return tableCreator.stream() 170 .flatMap(tp -> scanCreator.stream() 171 .map(sp -> new Object[] { tp.getFirst(), tp.getSecond(), sp.getFirst(), sp.getSecond() })) 172 .collect(Collectors.toList()); 173 } 174 175 protected abstract Scan createScan(); 176 177 protected abstract List<Result> doScan(Scan scan, int closeAfter) throws Exception; 178 179 /** 180 * Used by implementation classes to assert the correctness of spans produced under test. 181 */ 182 protected abstract void assertTraceContinuity(); 183 184 /** 185 * Used by implementation classes to assert the correctness of spans having errors. 186 */ 187 protected abstract void 188 assertTraceError(final Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher); 189 190 protected final List<Result> convertFromBatchResult(List<Result> results) { 191 assertEquals(0, results.size() % 2); 192 return IntStream.range(0, results.size() / 2).mapToObj(i -> { 193 try { 194 return Result 195 .createCompleteResult(Arrays.asList(results.get(2 * i), results.get(2 * i + 1))); 196 } catch (IOException e) { 197 throw new UncheckedIOException(e); 198 } 199 }).collect(Collectors.toList()); 200 } 201 202 protected static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) { 203 final Configuration conf = MINI_CLUSTER_RULE.getTestingUtility().getConfiguration(); 204 Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>( 205 "Span for test failed to complete.", OTEL_CLASS_RULE::getSpans, hasItem(parentSpanMatcher))); 206 } 207 208 protected static Stream<SpanData> spanStream() { 209 return OTEL_CLASS_RULE.getSpans().stream().filter(Objects::nonNull); 210 } 211 212 @Test 213 public void testScanAll() throws Exception { 214 List<Result> results = doScan(createScan(), -1); 215 // make sure all scanners are closed at RS side 216 MINI_CLUSTER_RULE.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream() 217 .map(JVMClusterUtil.RegionServerThread::getRegionServer).forEach( 218 rs -> assertEquals( 219 "The scanner count of " + rs.getServerName() + " is " 220 + rs.getRSRpcServices().getScannersCount(), 221 0, rs.getRSRpcServices().getScannersCount())); 222 assertEquals(COUNT, results.size()); 223 IntStream.range(0, COUNT).forEach(i -> { 224 Result result = results.get(i); 225 assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); 226 assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1))); 227 }); 228 } 229 230 private void assertResultEquals(Result result, int i) { 231 assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); 232 assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1))); 233 assertEquals(i * i, Bytes.toInt(result.getValue(FAMILY, CQ2))); 234 } 235 236 @Test 237 public void testReversedScanAll() throws Exception { 238 List<Result> results = 239 TraceUtil.trace(() -> doScan(createScan().setReversed(true), -1), testName.getMethodName()); 240 assertEquals(COUNT, results.size()); 241 IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1)); 242 assertTraceContinuity(); 243 } 244 245 @Test 246 public void testScanNoStopKey() throws Exception { 247 int start = 345; 248 List<Result> results = TraceUtil.trace( 249 () -> doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1), 250 testName.getMethodName()); 251 assertEquals(COUNT - start, results.size()); 252 IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i)); 253 assertTraceContinuity(); 254 } 255 256 @Test 257 public void testReverseScanNoStopKey() throws Exception { 258 int start = 765; 259 final Scan scan = 260 createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true); 261 List<Result> results = TraceUtil.trace(() -> doScan(scan, -1), testName.getMethodName()); 262 assertEquals(start + 1, results.size()); 263 IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i)); 264 assertTraceContinuity(); 265 } 266 267 @Test 268 public void testScanWrongColumnFamily() { 269 final Exception e = assertThrows(Exception.class, 270 () -> TraceUtil.trace( 271 () -> doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1), 272 testName.getMethodName())); 273 // hamcrest generic enforcement for `anyOf` is a pain; skip it 274 // but -- don't we always unwrap ExecutionExceptions -- bug? 275 if (e instanceof NoSuchColumnFamilyException) { 276 final NoSuchColumnFamilyException ex = (NoSuchColumnFamilyException) e; 277 assertThat(ex, isA(NoSuchColumnFamilyException.class)); 278 } else if (e instanceof ExecutionException) { 279 final ExecutionException ex = (ExecutionException) e; 280 assertThat(ex, allOf(isA(ExecutionException.class), 281 hasProperty("cause", isA(NoSuchColumnFamilyException.class)))); 282 } else { 283 fail("Found unexpected Exception " + e); 284 } 285 assertTraceError(anyOf( 286 containsEntry(is(SemanticAttributes.EXCEPTION_TYPE), 287 endsWith(NoSuchColumnFamilyException.class.getName())), 288 allOf( 289 containsEntry(is(SemanticAttributes.EXCEPTION_TYPE), 290 endsWith(RemoteWithExtrasException.class.getName())), 291 containsEntry(is(SemanticAttributes.EXCEPTION_MESSAGE), 292 containsString(NoSuchColumnFamilyException.class.getName()))))); 293 } 294 295 private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, 296 int limit) throws Exception { 297 testScan(start, startInclusive, stop, stopInclusive, limit, -1); 298 } 299 300 private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, 301 int limit, int closeAfter) throws Exception { 302 Scan scan = 303 createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) 304 .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive); 305 if (limit > 0) { 306 scan.setLimit(limit); 307 } 308 List<Result> results = doScan(scan, closeAfter); 309 int actualStart = startInclusive ? start : start + 1; 310 int actualStop = stopInclusive ? stop + 1 : stop; 311 int count = actualStop - actualStart; 312 if (limit > 0) { 313 count = Math.min(count, limit); 314 } 315 if (closeAfter > 0) { 316 count = Math.min(count, closeAfter); 317 } 318 assertEquals(count, results.size()); 319 IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i)); 320 } 321 322 private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive, 323 int limit) throws Exception { 324 Scan scan = 325 createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) 326 .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true); 327 if (limit > 0) { 328 scan.setLimit(limit); 329 } 330 List<Result> results = doScan(scan, -1); 331 int actualStart = startInclusive ? start : start - 1; 332 int actualStop = stopInclusive ? stop - 1 : stop; 333 int count = actualStart - actualStop; 334 if (limit > 0) { 335 count = Math.min(count, limit); 336 } 337 assertEquals(count, results.size()); 338 IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i)); 339 } 340 341 @Test 342 public void testScanWithStartKeyAndStopKey() throws Exception { 343 testScan(1, true, 998, false, -1); // from first region to last region 344 testScan(123, true, 345, true, -1); 345 testScan(234, true, 456, false, -1); 346 testScan(345, false, 567, true, -1); 347 testScan(456, false, 678, false, -1); 348 } 349 350 @Test 351 public void testReversedScanWithStartKeyAndStopKey() throws Exception { 352 testReversedScan(998, true, 1, false, -1); // from last region to first region 353 testReversedScan(543, true, 321, true, -1); 354 testReversedScan(654, true, 432, false, -1); 355 testReversedScan(765, false, 543, true, -1); 356 testReversedScan(876, false, 654, false, -1); 357 } 358 359 @Test 360 public void testScanAtRegionBoundary() throws Exception { 361 testScan(222, true, 333, true, -1); 362 testScan(333, true, 444, false, -1); 363 testScan(444, false, 555, true, -1); 364 testScan(555, false, 666, false, -1); 365 } 366 367 @Test 368 public void testReversedScanAtRegionBoundary() throws Exception { 369 testReversedScan(333, true, 222, true, -1); 370 testReversedScan(444, true, 333, false, -1); 371 testReversedScan(555, false, 444, true, -1); 372 testReversedScan(666, false, 555, false, -1); 373 } 374 375 @Test 376 public void testScanWithLimit() throws Exception { 377 testScan(1, true, 998, false, 900); // from first region to last region 378 testScan(123, true, 234, true, 100); 379 testScan(234, true, 456, false, 100); 380 testScan(345, false, 567, true, 100); 381 testScan(456, false, 678, false, 100); 382 } 383 384 @Test 385 public void testScanWithLimitGreaterThanActualCount() throws Exception { 386 testScan(1, true, 998, false, 1000); // from first region to last region 387 testScan(123, true, 345, true, 200); 388 testScan(234, true, 456, false, 200); 389 testScan(345, false, 567, true, 200); 390 testScan(456, false, 678, false, 200); 391 } 392 393 @Test 394 public void testReversedScanWithLimit() throws Exception { 395 testReversedScan(998, true, 1, false, 900); // from last region to first region 396 testReversedScan(543, true, 321, true, 100); 397 testReversedScan(654, true, 432, false, 100); 398 testReversedScan(765, false, 543, true, 100); 399 testReversedScan(876, false, 654, false, 100); 400 } 401 402 @Test 403 public void testReversedScanWithLimitGreaterThanActualCount() throws Exception { 404 testReversedScan(998, true, 1, false, 1000); // from last region to first region 405 testReversedScan(543, true, 321, true, 200); 406 testReversedScan(654, true, 432, false, 200); 407 testReversedScan(765, false, 543, true, 200); 408 testReversedScan(876, false, 654, false, 200); 409 } 410 411 @Test 412 public void testScanEndingEarly() throws Exception { 413 testScan(1, true, 998, false, 0, 900); // from first region to last region 414 testScan(123, true, 234, true, 0, 100); 415 testScan(234, true, 456, false, 0, 100); 416 testScan(345, false, 567, true, 0, 100); 417 testScan(456, false, 678, false, 0, 100); 418 } 419}