001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import org.apache.hadoop.hbase.client.Put; 028import org.apache.hadoop.hbase.client.Result; 029import org.apache.hadoop.hbase.client.ResultScanner; 030import org.apache.hadoop.hbase.client.Scan; 031import org.apache.hadoop.hbase.client.Table; 032import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 033import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; 034import org.apache.hadoop.hbase.filter.BinaryComparator; 035import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; 036import org.apache.hadoop.hbase.filter.Filter; 037import org.apache.hadoop.hbase.filter.FilterList; 038import org.apache.hadoop.hbase.filter.FilterList.Operator; 039import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 040import org.apache.hadoop.hbase.filter.RowFilter; 041import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; 042import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 043import org.apache.hadoop.hbase.testclassification.LargeTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.junit.AfterClass; 046import org.junit.BeforeClass; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053@Category(LargeTests.class) 054public class TestServerSideScanMetricsFromClientSide { 055 private static final Logger LOG = 056 LoggerFactory.getLogger(TestServerSideScanMetricsFromClientSide.class); 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestServerSideScanMetricsFromClientSide.class); 061 062 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 063 064 private static Table TABLE = null; 065 066 /** 067 * Table configuration 068 */ 069 private static TableName TABLE_NAME = TableName.valueOf("testTable"); 070 071 private static int NUM_ROWS = 10; 072 private static byte[] ROW = Bytes.toBytes("testRow"); 073 private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); 074 075 // Should keep this value below 10 to keep generation of expected kv's simple. If above 10 then 076 // table/row/cf1/... will be followed by table/row/cf10/... instead of table/row/cf2/... which 077 // breaks the simple generation of expected kv's 078 private static int NUM_FAMILIES = 1; 079 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 080 private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); 081 082 private static int NUM_QUALIFIERS = 1; 083 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 084 private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); 085 086 private static int VALUE_SIZE = 10; 087 private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); 088 089 private static int NUM_COLS = NUM_FAMILIES * NUM_QUALIFIERS; 090 091 // Approximation of how large the heap size of cells in our table. Should be accessed through 092 // getCellHeapSize(). 093 private static long CELL_HEAP_SIZE = -1; 094 095 @BeforeClass 096 public static void setUpBeforeClass() throws Exception { 097 TEST_UTIL.startMiniCluster(3); 098 TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); 099 } 100 101 static Table createTestTable(TableName name, byte[][] rows, byte[][] families, 102 byte[][] qualifiers, byte[] cellValue) throws IOException { 103 Table ht = TEST_UTIL.createTable(name, families); 104 List<Put> puts = createPuts(rows, families, qualifiers, cellValue); 105 ht.put(puts); 106 107 return ht; 108 } 109 110 @AfterClass 111 public static void tearDownAfterClass() throws Exception { 112 TEST_UTIL.shutdownMiniCluster(); 113 } 114 115 /** 116 * Make puts to put the input value into each combination of row, family, and qualifier 117 * @param rows the rows to use 118 * @param families the column families to use 119 * @param qualifiers the column qualifiers to use 120 * @param value the value to put 121 * @return the putted input values added in puts 122 * @throws IOException If an IO problem is encountered 123 */ 124 static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, 125 byte[] value) throws IOException { 126 Put put; 127 ArrayList<Put> puts = new ArrayList<>(); 128 129 for (int row = 0; row < rows.length; row++) { 130 put = new Put(rows[row]); 131 for (int fam = 0; fam < families.length; fam++) { 132 for (int qual = 0; qual < qualifiers.length; qual++) { 133 KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); 134 put.add(kv); 135 } 136 } 137 puts.add(put); 138 } 139 140 return puts; 141 } 142 143 /** 144 * @return The approximate heap size of a cell in the test table. All cells should have 145 * approximately the same heap size, so the value is cached to avoid repeating the 146 * calculation 147 * @throws Exception on unexpected failure 148 */ 149 private long getCellHeapSize() throws Exception { 150 if (CELL_HEAP_SIZE == -1) { 151 // Do a partial scan that will return a single result with a single cell 152 Scan scan = new Scan(); 153 scan.setMaxResultSize(1); 154 scan.setAllowPartialResults(true); 155 ResultScanner scanner = TABLE.getScanner(scan); 156 157 Result result = scanner.next(); 158 159 assertTrue(result != null); 160 assertTrue(result.rawCells() != null); 161 assertTrue(result.rawCells().length == 1); 162 163 CELL_HEAP_SIZE = result.rawCells()[0].heapSize(); 164 scanner.close(); 165 } 166 167 return CELL_HEAP_SIZE; 168 } 169 170 @Test 171 public void testRowsSeenMetricWithSync() throws Exception { 172 testRowsSeenMetric(false); 173 } 174 175 @Test 176 public void testRowsSeenMetricWithAsync() throws Exception { 177 testRowsSeenMetric(true); 178 } 179 180 private void testRowsSeenMetric(boolean async) throws Exception { 181 // Base scan configuration 182 Scan baseScan; 183 baseScan = new Scan(); 184 baseScan.setScanMetricsEnabled(true); 185 baseScan.setAsyncPrefetch(async); 186 try { 187 testRowsSeenMetric(baseScan); 188 189 // Test case that only a single result will be returned per RPC to the serer 190 baseScan.setCaching(1); 191 testRowsSeenMetric(baseScan); 192 193 // Test case that partial results are returned from the server. At most one cell will be 194 // contained in each response 195 baseScan.setMaxResultSize(1); 196 testRowsSeenMetric(baseScan); 197 198 // Test case that size limit is set such that a few cells are returned per partial result from 199 // the server 200 baseScan.setCaching(NUM_ROWS); 201 baseScan.setMaxResultSize(getCellHeapSize() * (NUM_COLS - 1)); 202 testRowsSeenMetric(baseScan); 203 } catch (Throwable t) { 204 LOG.error("FAIL", t); 205 throw t; 206 } 207 } 208 209 @Test 210 public void testFsReadTimeMetric() throws Exception { 211 // write some new puts and flush, as an easy way to ensure the read blocks are not cached 212 // so that we go into the fs write code path 213 List<Put> puts = createPuts(ROWS, FAMILIES, QUALIFIERS, VALUE); 214 TABLE.put(puts); 215 TEST_UTIL.flush(TABLE_NAME); 216 Scan scan = new Scan(); 217 scan.setScanMetricsEnabled(true); 218 testMetric(scan, ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME, 0, CompareOperator.GREATER); 219 } 220 221 public void testRowsSeenMetric(Scan baseScan) throws Exception { 222 Scan scan; 223 scan = new Scan(baseScan); 224 testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, NUM_ROWS); 225 226 for (int i = 0; i < ROWS.length - 1; i++) { 227 scan = new Scan(baseScan); 228 scan.withStartRow(ROWS[0]); 229 scan.withStopRow(ROWS[i + 1]); 230 testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, i + 1); 231 } 232 233 for (int i = ROWS.length - 1; i > 0; i--) { 234 scan = new Scan(baseScan); 235 scan.withStartRow(ROWS[i - 1]); 236 scan.withStopRow(ROWS[ROWS.length - 1]); 237 testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 238 ROWS.length - i); 239 } 240 241 // The filter should filter out all rows, but we still expect to see every row. 242 Filter filter = 243 new RowFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("xyz"))); 244 scan = new Scan(baseScan); 245 scan.setFilter(filter); 246 testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, ROWS.length); 247 248 // Filter should pass on all rows 249 SingleColumnValueFilter singleColumnValueFilter = 250 new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOperator.EQUAL, VALUE); 251 scan = new Scan(baseScan); 252 scan.setFilter(singleColumnValueFilter); 253 testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, ROWS.length); 254 255 // Filter should filter out all rows 256 singleColumnValueFilter = 257 new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOperator.NOT_EQUAL, VALUE); 258 scan = new Scan(baseScan); 259 scan.setFilter(singleColumnValueFilter); 260 testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, ROWS.length); 261 } 262 263 @Test 264 public void testRowsFilteredMetric() throws Exception { 265 // Base scan configuration 266 Scan baseScan; 267 baseScan = new Scan(); 268 baseScan.setScanMetricsEnabled(true); 269 270 // Test case where scan uses default values 271 testRowsFilteredMetric(baseScan); 272 273 // Test case where at most one Result is retrieved per RPC 274 baseScan.setCaching(1); 275 testRowsFilteredMetric(baseScan); 276 277 // Test case where size limit is very restrictive and partial results will be returned from 278 // server 279 baseScan.setMaxResultSize(1); 280 testRowsFilteredMetric(baseScan); 281 282 // Test a case where max result size limits response from server to only a few cells (not all 283 // cells from the row) 284 baseScan.setCaching(NUM_ROWS); 285 baseScan.setMaxResultSize(getCellHeapSize() * (NUM_COLS - 1)); 286 testRowsSeenMetric(baseScan); 287 } 288 289 public void testRowsFilteredMetric(Scan baseScan) throws Exception { 290 testRowsFilteredMetric(baseScan, null, 0); 291 292 // Row filter doesn't match any row key. All rows should be filtered 293 Filter filter = 294 new RowFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("xyz"))); 295 testRowsFilteredMetric(baseScan, filter, ROWS.length); 296 297 // Filter will return results containing only the first key. Number of entire rows filtered 298 // should be 0. 299 filter = new FirstKeyOnlyFilter(); 300 testRowsFilteredMetric(baseScan, filter, 0); 301 302 // Column prefix will find some matching qualifier on each row. Number of entire rows filtered 303 // should be 0 304 filter = new ColumnPrefixFilter(QUALIFIERS[0]); 305 testRowsFilteredMetric(baseScan, filter, 0); 306 307 // Column prefix will NOT find any matching qualifier on any row. All rows should be filtered 308 filter = new ColumnPrefixFilter(Bytes.toBytes("xyz")); 309 testRowsFilteredMetric(baseScan, filter, ROWS.length); 310 311 // Matching column value should exist in each row. No rows should be filtered. 312 filter = new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOperator.EQUAL, VALUE); 313 testRowsFilteredMetric(baseScan, filter, 0); 314 315 // No matching column value should exist in any row. Filter all rows 316 filter = 317 new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOperator.NOT_EQUAL, VALUE); 318 testRowsFilteredMetric(baseScan, filter, ROWS.length); 319 320 List<Filter> filters = new ArrayList<>(); 321 filters.add(new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROWS[0]))); 322 filters.add(new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROWS[3]))); 323 int numberOfMatchingRowFilters = filters.size(); 324 filter = new FilterList(Operator.MUST_PASS_ONE, filters); 325 testRowsFilteredMetric(baseScan, filter, ROWS.length - numberOfMatchingRowFilters); 326 filters.clear(); 327 328 // Add a single column value exclude filter for each column... The net effect is that all 329 // columns will be excluded when scanning on the server side. This will result in an empty cell 330 // array in RegionScanner#nextInternal which should be interpreted as a row being filtered. 331 for (int family = 0; family < FAMILIES.length; family++) { 332 for (int qualifier = 0; qualifier < QUALIFIERS.length; qualifier++) { 333 filters.add(new SingleColumnValueExcludeFilter(FAMILIES[family], QUALIFIERS[qualifier], 334 CompareOperator.EQUAL, VALUE)); 335 } 336 } 337 filter = new FilterList(Operator.MUST_PASS_ONE, filters); 338 testRowsFilteredMetric(baseScan, filter, ROWS.length); 339 } 340 341 public void testRowsFilteredMetric(Scan baseScan, Filter filter, int expectedNumFiltered) 342 throws Exception { 343 Scan scan = new Scan(baseScan); 344 if (filter != null) { 345 scan.setFilter(filter); 346 } 347 testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, 348 expectedNumFiltered); 349 } 350 351 /** 352 * Run the scan to completion and check the metric against the specified value 353 * @param scan The scan instance to use to record metrics 354 * @param metricKey The metric key name 355 * @param expectedValue The expected value of metric 356 * @throws Exception on unexpected failure 357 */ 358 public void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception { 359 testMetric(scan, metricKey, expectedValue, CompareOperator.EQUAL); 360 } 361 362 private void testMetric(Scan scan, String metricKey, long expectedValue, 363 CompareOperator compareOperator) throws Exception { 364 assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled()); 365 ResultScanner scanner = TABLE.getScanner(scan); 366 // Iterate through all the results 367 while (scanner.next() != null) { 368 continue; 369 } 370 scanner.close(); 371 ScanMetrics metrics = scanner.getScanMetrics(); 372 assertNotNull("Metrics are null", metrics); 373 assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey)); 374 final long actualMetricValue = metrics.getCounter(metricKey).get(); 375 if (compareOperator == CompareOperator.EQUAL) { 376 assertEquals( 377 "Metric: " + metricKey + " Expected: " + expectedValue + " Actual: " + actualMetricValue, 378 expectedValue, actualMetricValue); 379 } else { 380 assertTrue( 381 "Metric: " + metricKey + " Expected: > " + expectedValue + " Actual: " + actualMetricValue, 382 actualMetricValue > expectedValue); 383 } 384 } 385}