001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.concurrent.ThreadLocalRandom; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 033import org.apache.hadoop.hbase.client.Put; 034import org.apache.hadoop.hbase.client.Result; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 037import org.apache.hadoop.hbase.filter.Filter; 038import org.apache.hadoop.hbase.filter.FilterBase; 039import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType; 040import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 041import org.apache.hadoop.hbase.testclassification.RegionServerTests; 042import org.apache.hadoop.hbase.testclassification.SmallTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.junit.After; 045import org.junit.Before; 046import org.junit.ClassRule; 047import org.junit.Ignore; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050 051@Category({ RegionServerTests.class, SmallTests.class }) 052public class TestSwitchToStreamRead { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestSwitchToStreamRead.class); 057 058 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 059 060 private static TableName TABLE_NAME = TableName.valueOf("stream"); 061 062 private static byte[] FAMILY = Bytes.toBytes("cf"); 063 064 private static byte[] QUAL = Bytes.toBytes("cq"); 065 066 private static String VALUE_PREFIX; 067 068 private static HRegion REGION; 069 070 @Before 071 public void setUp() throws IOException { 072 UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048); 073 StringBuilder sb = new StringBuilder(256); 074 for (int i = 0; i < 255; i++) { 075 sb.append((char) ThreadLocalRandom.current().nextInt('A', 'z' + 1)); 076 } 077 VALUE_PREFIX = sb.append("-").toString(); 078 REGION = UTIL.createLocalHRegion( 079 TableDescriptorBuilder.newBuilder(TABLE_NAME) 080 .setColumnFamily( 081 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build()) 082 .build(), 083 null, null); 084 for (int i = 0; i < 900; i++) { 085 REGION 086 .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i))); 087 } 088 REGION.flush(true); 089 for (int i = 900; i < 1000; i++) { 090 REGION 091 .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i))); 092 } 093 } 094 095 @After 096 public void tearDown() throws IOException { 097 REGION.close(true); 098 UTIL.cleanupTestDir(); 099 } 100 101 @Test 102 public void test() throws IOException { 103 try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) { 104 StoreScanner storeScanner = (StoreScanner) scanner.storeHeap.getCurrentForTesting(); 105 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 106 if (kvs instanceof StoreFileScanner) { 107 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 108 // starting from pread so we use shared reader here. 109 assertTrue(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD); 110 } 111 } 112 List<Cell> cells = new ArrayList<>(); 113 for (int i = 0; i < 500; i++) { 114 assertTrue(scanner.next(cells)); 115 Result result = Result.create(cells); 116 assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL))); 117 cells.clear(); 118 scanner.shipped(); 119 } 120 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 121 if (kvs instanceof StoreFileScanner) { 122 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 123 // we should have convert to use stream read now. 124 assertFalse(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD); 125 } 126 } 127 for (int i = 500; i < 1000; i++) { 128 assertEquals(i != 999, scanner.next(cells)); 129 Result result = Result.create(cells); 130 assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL))); 131 cells.clear(); 132 scanner.shipped(); 133 } 134 } 135 // make sure all scanners are closed. 136 for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) { 137 assertFalse(sf.isReferencedInReads()); 138 } 139 } 140 141 public static final class MatchLastRowKeyFilter extends FilterBase { 142 143 @Override 144 public boolean filterRowKey(Cell cell) throws IOException { 145 return Bytes.toInt(cell.getRowArray(), cell.getRowOffset()) != 999; 146 } 147 } 148 149 private void testFilter(Filter filter) throws IOException { 150 try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setFilter(filter))) { 151 StoreScanner storeScanner = (StoreScanner) scanner.storeHeap.getCurrentForTesting(); 152 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 153 if (kvs instanceof StoreFileScanner) { 154 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 155 // starting from pread so we use shared reader here. 156 assertTrue(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD); 157 } 158 } 159 List<Cell> cells = new ArrayList<>(); 160 // should return before finishing the scan as we want to switch from pread to stream 161 assertTrue(scanner.next(cells, 162 ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build())); 163 assertTrue(cells.isEmpty()); 164 scanner.shipped(); 165 166 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 167 if (kvs instanceof StoreFileScanner) { 168 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 169 // we should have convert to use stream read now. 170 assertFalse(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD); 171 } 172 } 173 assertFalse(scanner.next(cells, 174 ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build())); 175 Result result = Result.create(cells); 176 assertEquals(VALUE_PREFIX + 999, Bytes.toString(result.getValue(FAMILY, QUAL))); 177 cells.clear(); 178 scanner.shipped(); 179 } 180 // make sure all scanners are closed. 181 for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) { 182 assertFalse(sf.isReferencedInReads()); 183 } 184 } 185 186 // We use a different logic to implement filterRowKey, where we will keep calling kvHeap.next 187 // until the row key is changed. And there we can only use NoLimitScannerContext so we can not 188 // make the upper layer return immediately. Simply do not use NoLimitScannerContext will lead to 189 // an infinite loop. Need to dig more, the code are way too complicated... 190 @Ignore 191 @Test 192 public void testFilterRowKey() throws IOException { 193 testFilter(new MatchLastRowKeyFilter()); 194 } 195 196 public static final class MatchLastRowCellNextColFilter extends FilterBase { 197 198 @Override 199 public ReturnCode filterCell(Cell c) throws IOException { 200 if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) { 201 return ReturnCode.INCLUDE; 202 } else { 203 return ReturnCode.NEXT_COL; 204 } 205 } 206 } 207 208 @Test 209 public void testFilterCellNextCol() throws IOException { 210 testFilter(new MatchLastRowCellNextColFilter()); 211 } 212 213 public static final class MatchLastRowCellNextRowFilter extends FilterBase { 214 215 @Override 216 public ReturnCode filterCell(Cell c) throws IOException { 217 if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) { 218 return ReturnCode.INCLUDE; 219 } else { 220 return ReturnCode.NEXT_ROW; 221 } 222 } 223 } 224 225 @Test 226 public void testFilterCellNextRow() throws IOException { 227 testFilter(new MatchLastRowCellNextRowFilter()); 228 } 229 230 public static final class MatchLastRowFilterRowFilter extends FilterBase { 231 232 private boolean exclude; 233 234 @Override 235 public void filterRowCells(List<Cell> kvs) throws IOException { 236 Cell c = kvs.get(0); 237 exclude = Bytes.toInt(c.getRowArray(), c.getRowOffset()) != 999; 238 } 239 240 @Override 241 public void reset() throws IOException { 242 exclude = false; 243 } 244 245 @Override 246 public boolean filterRow() throws IOException { 247 return exclude; 248 } 249 250 @Override 251 public boolean hasFilterRow() { 252 return true; 253 } 254 } 255 256 @Test 257 public void testFilterRow() throws IOException { 258 testFilter(new MatchLastRowFilterRowFilter()); 259 } 260}