001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.HashSet; 026import java.util.Set; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CompareOperator; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.RegionLocator; 036import org.apache.hadoop.hbase.client.Result; 037import org.apache.hadoop.hbase.client.ResultScanner; 038import org.apache.hadoop.hbase.client.Scan; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 041import org.apache.hadoop.hbase.filter.BinaryComparator; 042import org.apache.hadoop.hbase.filter.ColumnPaginationFilter; 043import org.apache.hadoop.hbase.filter.QualifierFilter; 044import org.apache.hadoop.hbase.filter.RowFilter; 045import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; 046import org.apache.hadoop.hbase.filter.SkipFilter; 047import org.apache.hadoop.hbase.testclassification.LargeTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.junit.Before; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054 055@Category({ LargeTests.class }) 056public class TestScannerBlockSizeLimits { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestScannerBlockSizeLimits.class); 061 062 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 063 private static final TableName TABLE = TableName.valueOf("TestScannerBlockSizeLimits"); 064 private static final byte[] FAMILY1 = Bytes.toBytes("0"); 065 private static final byte[] FAMILY2 = Bytes.toBytes("1"); 066 067 private static final byte[] DATA = new byte[1000]; 068 private static final byte[][] FAMILIES = new byte[][] { FAMILY1, FAMILY2 }; 069 070 private static final byte[] COLUMN1 = Bytes.toBytes(0); 071 private static final byte[] COLUMN2 = Bytes.toBytes(1); 072 private static final byte[] COLUMN3 = Bytes.toBytes(2); 073 private static final byte[] COLUMN5 = Bytes.toBytes(5); 074 075 @BeforeClass 076 public static void setUp() throws Exception { 077 Configuration conf = TEST_UTIL.getConfiguration(); 078 conf.setInt(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 4200); 079 TEST_UTIL.startMiniCluster(1); 080 TEST_UTIL.createTable(TABLE, FAMILIES, 1, 2048); 081 createTestData(); 082 } 083 084 @Before 085 public void setupEach() throws Exception { 086 HRegionServer regionServer = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); 087 for (HRegion region : regionServer.getRegions(TABLE)) { 088 System.out.println("Clearing cache for region " + region.getRegionInfo().getEncodedName()); 089 regionServer.clearRegionBlockCache(region); 090 } 091 } 092 093 private static void createTestData() throws IOException, InterruptedException { 094 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(TABLE); 095 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 096 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(TABLE).getRegion(regionName); 097 098 for (int i = 1; i < 10; i++) { 099 // 5 columns per row, in 2 families 100 // Each column value is 1000 bytes, which is enough to fill a full block with row and header. 101 // So 5 blocks per row in FAMILY1 102 Put put = new Put(Bytes.toBytes(i)); 103 for (int j = 0; j < 6; j++) { 104 put.addColumn(FAMILY1, Bytes.toBytes(j), DATA); 105 } 106 107 // Additional block in FAMILY2 (notably smaller than block size) 108 put.addColumn(FAMILY2, COLUMN1, DATA); 109 110 region.put(put); 111 112 if (i % 2 == 0) { 113 region.flush(true); 114 } 115 } 116 117 // we've created 10 storefiles at this point, 5 per family 118 region.flush(true); 119 120 } 121 122 /** 123 * Simplest test that ensures we don't count block sizes too much. These 2 requested cells are in 124 * the same block, so should be returned in 1 request. If we mis-counted blocks, it'd come in 2 125 * requests. 126 */ 127 @Test 128 public void testSingleBlock() throws IOException { 129 Table table = TEST_UTIL.getConnection().getTable(TABLE); 130 131 ResultScanner scanner = 132 table.getScanner(getBaseScan().withStartRow(Bytes.toBytes(1)).withStopRow(Bytes.toBytes(2)) 133 .addColumn(FAMILY1, COLUMN1).addColumn(FAMILY1, COLUMN2).setReadType(Scan.ReadType.STREAM)); 134 135 ScanMetrics metrics = scanner.getScanMetrics(); 136 137 scanner.next(100); 138 139 // we fetch 2 columns from 1 row, so about 2 blocks 140 assertEquals(4120, metrics.countOfBlockBytesScanned.get()); 141 assertEquals(1, metrics.countOfRowsScanned.get()); 142 assertEquals(1, metrics.countOfRPCcalls.get()); 143 } 144 145 /** 146 * Tests that we check size limit after filterRowKey. When filterRowKey, we call nextRow to skip 147 * to next row. This should be efficient in this case, but we still need to check size limits 148 * after each row is processed. So in this test, we accumulate some block IO reading row 1, then 149 * skip row 2 and should return early at that point. The next rpc call starts with row3 blocks 150 * loaded, so can return the whole row in one rpc. If we were not checking size limits, we'd have 151 * been able to load an extra row 3 cell into the first rpc and thus split row 3 across multiple 152 * Results. 153 */ 154 @Test 155 public void testCheckLimitAfterFilterRowKey() throws IOException { 156 157 Table table = TEST_UTIL.getConnection().getTable(TABLE); 158 159 ResultScanner scanner = table.getScanner(getBaseScan().addColumn(FAMILY1, COLUMN1) 160 .addColumn(FAMILY1, COLUMN2).addColumn(FAMILY1, COLUMN3).addFamily(FAMILY2) 161 .setFilter(new RowFilter(CompareOperator.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(2))))); 162 163 ScanMetrics metrics = scanner.getScanMetrics(); 164 165 boolean foundRow3 = false; 166 for (Result result : scanner) { 167 Set<Integer> rows = new HashSet<>(); 168 for (Cell cell : result.rawCells()) { 169 rows.add(Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 170 } 171 if (rows.contains(3)) { 172 assertFalse("expected row3 to come all in one result, but found it in two results", 173 foundRow3); 174 assertEquals(1, rows.size()); 175 foundRow3 = true; 176 } 177 } 178 179 // 22 blocks, last one is 1030 bytes (approx 3 per row for 8 rows, but some compaction happens 180 // in family2 since each row only has 1 cell there and 2 can fit per block) 181 assertEquals(44290, metrics.countOfBlockBytesScanned.get()); 182 // We can return 22 blocks in 9 RPCs, but need an extra one to check for more rows at end 183 assertEquals(10, metrics.countOfRPCcalls.get()); 184 } 185 186 /** 187 * After RegionScannerImpl.populateResults, row filters are run. If row is excluded due to 188 * filter.filterRow(), nextRow() is called which might accumulate more block IO. Validates that in 189 * this case we still honor block limits. 190 */ 191 @Test 192 public void testCheckLimitAfterFilteringRowCellsDueToFilterRow() throws IOException { 193 Table table = TEST_UTIL.getConnection().getTable(TABLE); 194 195 ResultScanner scanner = table.getScanner(getBaseScan().withStartRow(Bytes.toBytes(1), true) 196 .addColumn(FAMILY1, COLUMN1).addColumn(FAMILY1, COLUMN2).setReadType(Scan.ReadType.STREAM) 197 .setFilter(new SkipFilter(new QualifierFilter(CompareOperator.EQUAL, 198 new BinaryComparator(Bytes.toBytes("dasfasf")))))); 199 200 // Our filter doesn't end up matching any real columns, so expect only cursors 201 for (Result result : scanner) { 202 assertTrue(result.isCursor()); 203 } 204 205 ScanMetrics metrics = scanner.getScanMetrics(); 206 207 // scanning over 9 rows, filtering on 2 contiguous columns each, so 9 blocks total 208 assertEquals(18540, metrics.countOfBlockBytesScanned.get()); 209 // limited to 4200 bytes per which is enough for 3 blocks (exceed limit after loading 3rd) 210 // so that's 3 RPC and the last RPC pulls the cells loaded by the last block 211 assertEquals(4, metrics.countOfRPCcalls.get()); 212 } 213 214 /** 215 * At the end of the loop in StoreScanner, we do one more check of size limits. This is to catch 216 * block size being exceeded while filtering cells within a store. Test to ensure that we do that, 217 * otherwise we'd see no cursors below. 218 */ 219 @Test 220 public void testCheckLimitAfterFilteringCell() throws IOException { 221 Table table = TEST_UTIL.getConnection().getTable(TABLE); 222 223 ResultScanner scanner = table.getScanner(getBaseScan() 224 .setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(COLUMN2)))); 225 226 int cursors = 0; 227 for (Result result : scanner) { 228 if (result.isCursor()) { 229 cursors++; 230 } 231 } 232 ScanMetrics metrics = scanner.getScanMetrics(); 233 System.out.println(metrics.countOfBlockBytesScanned.get()); 234 235 // 9 rows, total of 32 blocks (last one is 1030) 236 assertEquals(64890, metrics.countOfBlockBytesScanned.get()); 237 // We can return 32 blocks in approx 11 RPCs but we need 2 cursors due to the narrow filter 238 assertEquals(2, cursors); 239 assertEquals(11, metrics.countOfRPCcalls.get()); 240 } 241 242 /** 243 * After RegionScannerImpl.populateResults, row filters are run. If row is excluded due to 244 * filter.filterRowCells(), we fall through to a final results.isEmpty() check near the end of the 245 * method. If results are empty at this point (which they are), nextRow() is called which might 246 * accumulate more block IO. Validates that in this case we still honor block limits. 247 */ 248 @Test 249 public void testCheckLimitAfterFilteringRowCells() throws IOException { 250 Table table = TEST_UTIL.getConnection().getTable(TABLE); 251 252 ResultScanner scanner = table 253 .getScanner(getBaseScan().withStartRow(Bytes.toBytes(1), true).addColumn(FAMILY1, COLUMN1) 254 .setReadType(Scan.ReadType.STREAM).setFilter(new SingleColumnValueExcludeFilter(FAMILY1, 255 COLUMN1, CompareOperator.EQUAL, new BinaryComparator(DATA)))); 256 257 // Since we use SingleColumnValueExcludeFilter and dont include any other columns, the column 258 // we load to test ends up being excluded from the result. So we only expect cursors here. 259 for (Result result : scanner) { 260 assertTrue(result.isCursor()); 261 } 262 263 ScanMetrics metrics = scanner.getScanMetrics(); 264 265 // Our filter causes us to read the first column of each row, then INCLUDE_AND_SEEK_NEXT_ROW. 266 // So we load 1 block per row, and there are 9 rows. So 9 blocks 267 assertEquals(18540, metrics.countOfBlockBytesScanned.get()); 268 // We can return 9 blocks in 3 RPCs, but need 1 more to check for more results (returns 0) 269 assertEquals(4, metrics.countOfRPCcalls.get()); 270 } 271 272 /** 273 * Tests that when we seek over blocks we dont include them in the block size of the request 274 */ 275 @Test 276 public void testSeekNextUsingHint() throws IOException { 277 Table table = TEST_UTIL.getConnection().getTable(TABLE); 278 279 ResultScanner scanner = table.getScanner( 280 getBaseScan().addFamily(FAMILY1).setFilter(new ColumnPaginationFilter(1, COLUMN5))); 281 282 scanner.next(100); 283 ScanMetrics metrics = scanner.getScanMetrics(); 284 285 // We have to read the first cell/block of each row, then can skip to the last block. So that's 286 // 2 blocks per row to read (18 blocks total) 287 assertEquals(37080, metrics.countOfBlockBytesScanned.get()); 288 // Our max scan size is enough to read 3 blocks per RPC, plus one final RPC to finish region. 289 assertEquals(7, metrics.countOfRPCcalls.get()); 290 } 291 292 /** 293 * We enable cursors and partial results to give us more granularity over counting of results, and 294 * we enable STREAM so that no auto switching from pread to stream occurs -- this throws off the 295 * rpc counts. 296 */ 297 private Scan getBaseScan() { 298 return new Scan().setScanMetricsEnabled(true).setNeedCursorResult(true) 299 .setAllowPartialResults(true).setReadType(Scan.ReadType.STREAM); 300 } 301}