001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.EnumSet; 026import java.util.HashMap; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.ClusterMetrics.Option; 033import org.apache.hadoop.hbase.CompareOperator; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.RegionMetrics; 037import org.apache.hadoop.hbase.ServerMetrics; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.Append; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Get; 045import org.apache.hadoop.hbase.client.Increment; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.ResultScanner; 050import org.apache.hadoop.hbase.client.RowMutations; 051import org.apache.hadoop.hbase.client.Scan; 052import org.apache.hadoop.hbase.client.Table; 053import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 054import org.apache.hadoop.hbase.coprocessor.ObserverContext; 055import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 056import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 057import org.apache.hadoop.hbase.coprocessor.RegionObserver; 058import org.apache.hadoop.hbase.filter.BinaryComparator; 059import org.apache.hadoop.hbase.filter.RowFilter; 060import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 061import org.apache.hadoop.hbase.testclassification.MediumTests; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.junit.AfterClass; 064import org.junit.BeforeClass; 065import org.junit.ClassRule; 066import org.junit.Ignore; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072@Ignore // Depends on Master being able to host regions. Needs fixing. 073@Category(MediumTests.class) 074public class TestRegionServerReadRequestMetrics { 075 076 @ClassRule 077 public static final HBaseClassTestRule CLASS_RULE = 078 HBaseClassTestRule.forClass(TestRegionServerReadRequestMetrics.class); 079 080 private static final Logger LOG = 081 LoggerFactory.getLogger(TestRegionServerReadRequestMetrics.class); 082 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 083 private static final TableName TABLE_NAME = TableName.valueOf("test"); 084 private static final byte[] CF1 = Bytes.toBytes("c1"); 085 private static final byte[] CF2 = Bytes.toBytes("c2"); 086 087 private static final byte[] ROW1 = Bytes.toBytes("a"); 088 private static final byte[] ROW2 = Bytes.toBytes("b"); 089 private static final byte[] ROW3 = Bytes.toBytes("c"); 090 private static final byte[] COL1 = Bytes.toBytes("q1"); 091 private static final byte[] COL2 = Bytes.toBytes("q2"); 092 private static final byte[] COL3 = Bytes.toBytes("q3"); 093 private static final byte[] VAL1 = Bytes.toBytes("v1"); 094 private static final byte[] VAL2 = Bytes.toBytes("v2"); 095 private static final byte[] VAL3 = Bytes.toBytes(0L); 096 097 private static final int MAX_TRY = 20; 098 private static final int SLEEP_MS = 100; 099 private static final int TTL = 1; 100 101 private static Admin admin; 102 private static Collection<ServerName> serverNames; 103 private static Table table; 104 private static RegionInfo regionInfo; 105 106 private static Map<Metric, Long> requestsMap = new HashMap<>(); 107 private static Map<Metric, Long> requestsMapPrev = new HashMap<>(); 108 109 @BeforeClass 110 public static void setUpOnce() throws Exception { 111 TEST_UTIL.startMiniCluster(); 112 admin = TEST_UTIL.getAdmin(); 113 serverNames = 114 admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().keySet(); 115 table = createTable(); 116 putData(); 117 List<RegionInfo> regions = admin.getRegions(TABLE_NAME); 118 assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regions.size()); 119 regionInfo = regions.get(0); 120 121 for (Metric metric : Metric.values()) { 122 requestsMap.put(metric, 0L); 123 requestsMapPrev.put(metric, 0L); 124 } 125 } 126 127 private static Table createTable() throws IOException { 128 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME); 129 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1)); 130 builder 131 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2).setTimeToLive(TTL).build()); 132 admin.createTable(builder.build()); 133 return TEST_UTIL.getConnection().getTable(TABLE_NAME); 134 } 135 136 private static void testReadRequests(long resultCount, long expectedReadRequests, 137 long expectedFilteredReadRequests) throws IOException, InterruptedException { 138 updateMetricsMap(); 139 System.out.println("requestsMapPrev = " + requestsMapPrev); 140 System.out.println("requestsMap = " + requestsMap); 141 142 assertEquals(expectedReadRequests, 143 requestsMap.get(Metric.REGION_READ) - requestsMapPrev.get(Metric.REGION_READ)); 144 assertEquals(expectedFilteredReadRequests, requestsMap.get(Metric.FILTERED_REGION_READ) 145 - requestsMapPrev.get(Metric.FILTERED_REGION_READ)); 146 assertEquals(expectedFilteredReadRequests, requestsMap.get(Metric.FILTERED_SERVER_READ) 147 - requestsMapPrev.get(Metric.FILTERED_SERVER_READ)); 148 assertEquals(expectedReadRequests, resultCount); 149 } 150 151 private static void updateMetricsMap() throws IOException, InterruptedException { 152 for (Metric metric : Metric.values()) { 153 requestsMapPrev.put(metric, requestsMap.get(metric)); 154 } 155 156 ServerMetrics serverMetrics = null; 157 RegionMetrics regionMetricsOuter = null; 158 boolean metricsUpdated = false; 159 for (int i = 0; i < MAX_TRY; i++) { 160 for (ServerName serverName : serverNames) { 161 serverMetrics = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) 162 .getLiveServerMetrics().get(serverName); 163 164 Map<byte[], RegionMetrics> regionMetrics = serverMetrics.getRegionMetrics(); 165 RegionMetrics regionMetric = regionMetrics.get(regionInfo.getRegionName()); 166 if (regionMetric != null) { 167 regionMetricsOuter = regionMetric; 168 for (Metric metric : Metric.values()) { 169 if (getReadRequest(serverMetrics, regionMetric, metric) > requestsMapPrev.get(metric)) { 170 for (Metric metricInner : Metric.values()) { 171 requestsMap.put(metricInner, 172 getReadRequest(serverMetrics, regionMetric, metricInner)); 173 } 174 metricsUpdated = true; 175 break; 176 } 177 } 178 } 179 } 180 if (metricsUpdated) { 181 break; 182 } 183 Thread.sleep(SLEEP_MS); 184 } 185 if (!metricsUpdated) { 186 for (Metric metric : Metric.values()) { 187 requestsMap.put(metric, getReadRequest(serverMetrics, regionMetricsOuter, metric)); 188 } 189 } 190 } 191 192 private static long getReadRequest(ServerMetrics serverMetrics, RegionMetrics regionMetrics, 193 Metric metric) { 194 switch (metric) { 195 case REGION_READ: 196 return regionMetrics.getReadRequestCount(); 197 case SERVER_READ: 198 return serverMetrics.getRegionMetrics().get(regionMetrics.getRegionName()) 199 .getReadRequestCount(); 200 case FILTERED_REGION_READ: 201 return regionMetrics.getFilteredReadRequestCount(); 202 case FILTERED_SERVER_READ: 203 return serverMetrics.getRegionMetrics().get(regionMetrics.getRegionName()) 204 .getFilteredReadRequestCount(); 205 default: 206 throw new IllegalStateException(); 207 } 208 } 209 210 private static void putData() throws IOException { 211 Put put; 212 213 put = new Put(ROW1); 214 put.addColumn(CF1, COL1, VAL1); 215 put.addColumn(CF1, COL2, VAL2); 216 put.addColumn(CF1, COL3, VAL3); 217 table.put(put); 218 put = new Put(ROW2); 219 put.addColumn(CF1, COL1, VAL2); // put val2 instead of val1 220 put.addColumn(CF1, COL2, VAL2); 221 table.put(put); 222 put = new Put(ROW3); 223 put.addColumn(CF1, COL1, VAL1); 224 put.addColumn(CF1, COL2, VAL2); 225 table.put(put); 226 } 227 228 private static void putTTLExpiredData() throws IOException, InterruptedException { 229 Put put; 230 231 put = new Put(ROW1); 232 put.addColumn(CF2, COL1, VAL1); 233 put.addColumn(CF2, COL2, VAL2); 234 table.put(put); 235 236 Thread.sleep(TTL * 1000); 237 238 put = new Put(ROW2); 239 put.addColumn(CF2, COL1, VAL1); 240 put.addColumn(CF2, COL2, VAL2); 241 table.put(put); 242 243 put = new Put(ROW3); 244 put.addColumn(CF2, COL1, VAL1); 245 put.addColumn(CF2, COL2, VAL2); 246 table.put(put); 247 } 248 249 @AfterClass 250 public static void tearDownOnce() throws Exception { 251 TEST_UTIL.shutdownMiniCluster(); 252 } 253 254 @Test 255 public void testReadRequestsCountNotFiltered() throws Exception { 256 int resultCount; 257 Scan scan; 258 Append append; 259 Put put; 260 Increment increment; 261 Get get; 262 263 // test for scan 264 scan = new Scan(); 265 try (ResultScanner scanner = table.getScanner(scan)) { 266 resultCount = 0; 267 for (Result ignore : scanner) { 268 resultCount++; 269 } 270 testReadRequests(resultCount, 3, 0); 271 } 272 273 // test for scan 274 scan = new Scan().withStartRow(ROW2).withStopRow(ROW3); 275 try (ResultScanner scanner = table.getScanner(scan)) { 276 resultCount = 0; 277 for (Result ignore : scanner) { 278 resultCount++; 279 } 280 testReadRequests(resultCount, 1, 0); 281 } 282 283 // test for get 284 get = new Get(ROW2); 285 Result result = table.get(get); 286 resultCount = result.isEmpty() ? 0 : 1; 287 testReadRequests(resultCount, 1, 0); 288 289 // test for increment 290 increment = new Increment(ROW1); 291 increment.addColumn(CF1, COL3, 1); 292 result = table.increment(increment); 293 resultCount = result.isEmpty() ? 0 : 1; 294 testReadRequests(resultCount, 1, 0); 295 296 // test for checkAndPut 297 put = new Put(ROW1); 298 put.addColumn(CF1, COL2, VAL2); 299 boolean checkAndPut = 300 table.checkAndMutate(ROW1, CF1).qualifier(COL2).ifEquals(VAL2).thenPut(put); 301 resultCount = checkAndPut ? 1 : 0; 302 testReadRequests(resultCount, 1, 0); 303 304 // test for append 305 append = new Append(ROW1); 306 append.addColumn(CF1, COL2, VAL2); 307 result = table.append(append); 308 resultCount = result.isEmpty() ? 0 : 1; 309 testReadRequests(resultCount, 1, 0); 310 311 // test for checkAndMutate 312 put = new Put(ROW1); 313 put.addColumn(CF1, COL1, VAL1); 314 RowMutations rm = new RowMutations(ROW1); 315 rm.add(put); 316 boolean checkAndMutate = 317 table.checkAndMutate(ROW1, CF1).qualifier(COL1).ifEquals(VAL1).thenMutate(rm); 318 resultCount = checkAndMutate ? 1 : 0; 319 testReadRequests(resultCount, 1, 0); 320 } 321 322 @Ignore // HBASE-19785 323 @Test 324 public void testReadRequestsCountWithFilter() throws Exception { 325 int resultCount; 326 Scan scan; 327 328 // test for scan 329 scan = new Scan(); 330 scan.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareOperator.EQUAL, VAL1)); 331 try (ResultScanner scanner = table.getScanner(scan)) { 332 resultCount = 0; 333 for (Result ignore : scanner) { 334 resultCount++; 335 } 336 testReadRequests(resultCount, 2, 1); 337 } 338 339 // test for scan 340 scan = new Scan(); 341 scan.setFilter(new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROW1))); 342 try (ResultScanner scanner = table.getScanner(scan)) { 343 resultCount = 0; 344 for (Result ignore : scanner) { 345 resultCount++; 346 } 347 testReadRequests(resultCount, 1, 2); 348 } 349 350 // test for scan 351 scan = new Scan().withStartRow(ROW2).withStopRow(ROW3); 352 scan.setFilter(new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROW1))); 353 try (ResultScanner scanner = table.getScanner(scan)) { 354 resultCount = 0; 355 for (Result ignore : scanner) { 356 resultCount++; 357 } 358 testReadRequests(resultCount, 0, 1); 359 } 360 361 // fixme filtered get should not increase readRequestsCount 362 // Get get = new Get(ROW2); 363 // get.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareFilter.CompareOp.EQUAL, VAL1)); 364 // Result result = table.get(get); 365 // resultCount = result.isEmpty() ? 0 : 1; 366 // testReadRequests(resultCount, 0, 1); 367 } 368 369 @Ignore // HBASE-19785 370 @Test 371 public void testReadRequestsCountWithDeletedRow() throws Exception { 372 try { 373 Delete delete = new Delete(ROW3); 374 table.delete(delete); 375 376 Scan scan = new Scan(); 377 try (ResultScanner scanner = table.getScanner(scan)) { 378 int resultCount = 0; 379 for (Result ignore : scanner) { 380 resultCount++; 381 } 382 testReadRequests(resultCount, 2, 1); 383 } 384 } finally { 385 Put put = new Put(ROW3); 386 put.addColumn(CF1, COL1, VAL1); 387 put.addColumn(CF1, COL2, VAL2); 388 table.put(put); 389 } 390 } 391 392 @Test 393 public void testReadRequestsCountWithTTLExpiration() throws Exception { 394 putTTLExpiredData(); 395 396 Scan scan = new Scan(); 397 scan.addFamily(CF2); 398 try (ResultScanner scanner = table.getScanner(scan)) { 399 int resultCount = 0; 400 for (Result ignore : scanner) { 401 resultCount++; 402 } 403 testReadRequests(resultCount, 2, 1); 404 } 405 } 406 407 @Ignore // See HBASE-19785 408 @Test 409 public void testReadRequestsWithCoprocessor() throws Exception { 410 TableName tableName = TableName.valueOf("testReadRequestsWithCoprocessor"); 411 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 412 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1)); 413 builder.setCoprocessor(ScanRegionCoprocessor.class.getName()); 414 admin.createTable(builder.build()); 415 416 try { 417 TEST_UTIL.waitTableAvailable(tableName); 418 List<RegionInfo> regionInfos = admin.getRegions(tableName); 419 assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regionInfos.size()); 420 boolean success = true; 421 int i = 0; 422 for (; i < MAX_TRY; i++) { 423 try { 424 testReadRequests(regionInfos.get(0).getRegionName(), 3); 425 } catch (Throwable t) { 426 LOG.warn("Got exception when try " + i + " times", t); 427 Thread.sleep(SLEEP_MS); 428 success = false; 429 } 430 if (success) { 431 break; 432 } 433 } 434 if (i == MAX_TRY) { 435 fail("Failed to get right read requests metric after try " + i + " times"); 436 } 437 } finally { 438 admin.disableTable(tableName); 439 admin.deleteTable(tableName); 440 } 441 } 442 443 private void testReadRequests(byte[] regionName, int expectedReadRequests) throws Exception { 444 for (ServerName serverName : serverNames) { 445 ServerMetrics serverMetrics = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) 446 .getLiveServerMetrics().get(serverName); 447 Map<byte[], RegionMetrics> regionMetrics = serverMetrics.getRegionMetrics(); 448 RegionMetrics regionMetric = regionMetrics.get(regionName); 449 if (regionMetric != null) { 450 LOG.debug("server read request is " 451 + serverMetrics.getRegionMetrics().get(regionName).getReadRequestCount() 452 + ", region read request is " + regionMetric.getReadRequestCount()); 453 assertEquals(3, serverMetrics.getRegionMetrics().get(regionName).getReadRequestCount()); 454 assertEquals(3, regionMetric.getReadRequestCount()); 455 } 456 } 457 } 458 459 public static class ScanRegionCoprocessor implements RegionCoprocessor, RegionObserver { 460 @Override 461 public Optional<RegionObserver> getRegionObserver() { 462 return Optional.of(this); 463 } 464 465 @Override 466 public void postOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c) { 467 RegionCoprocessorEnvironment env = c.getEnvironment(); 468 Region region = env.getRegion(); 469 try { 470 putData(region); 471 RegionScanner scanner = region.getScanner(new Scan()); 472 List<Cell> result = new LinkedList<>(); 473 while (scanner.next(result)) { 474 result.clear(); 475 } 476 } catch (Exception e) { 477 LOG.warn("Got exception in coprocessor", e); 478 } 479 } 480 481 private void putData(Region region) throws Exception { 482 Put put = new Put(ROW1); 483 put.addColumn(CF1, COL1, VAL1); 484 region.put(put); 485 put = new Put(ROW2); 486 put.addColumn(CF1, COL1, VAL1); 487 region.put(put); 488 put = new Put(ROW3); 489 put.addColumn(CF1, COL1, VAL1); 490 region.put(put); 491 } 492 } 493 494 private enum Metric { 495 REGION_READ, 496 SERVER_READ, 497 FILTERED_REGION_READ, 498 FILTERED_SERVER_READ 499 } 500}