001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertNull; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.Optional; 025import java.util.concurrent.atomic.AtomicReference; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.Put; 034import org.apache.hadoop.hbase.client.Result; 035import org.apache.hadoop.hbase.client.ResultScanner; 036import org.apache.hadoop.hbase.client.Scan; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.io.ByteBuffAllocator; 039import org.apache.hadoop.hbase.io.DeallocateRewriteByteBuffAllocator; 040import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 041import org.apache.hadoop.hbase.ipc.RpcCall; 042import org.apache.hadoop.hbase.ipc.RpcServer; 043import org.apache.hadoop.hbase.testclassification.LargeTests; 044import org.apache.hadoop.hbase.testclassification.RegionServerTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.junit.AfterClass; 047import org.junit.BeforeClass; 048import org.junit.ClassRule; 049import org.junit.Rule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.junit.rules.TestName; 053 054import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 055import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 056 057import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 059 060@Category({ RegionServerTests.class, LargeTests.class }) 061public class TestRegionServerScan { 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestRegionServerScan.class); 065 066 @Rule 067 public TestName name = new TestName(); 068 069 private static final byte[] CF = Bytes.toBytes("CF"); 070 private static final byte[] CQ = Bytes.toBytes("CQ"); 071 private static final byte[] VALUE = new byte[1200]; 072 073 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 074 private static final Configuration conf = TEST_UTIL.getConfiguration(); 075 private static Admin admin = null; 076 static final TableName tableName = TableName.valueOf("TestRegionServerScan"); 077 static final byte[] r0 = Bytes.toBytes("row-0"); 078 static final byte[] r1 = Bytes.toBytes("row-1"); 079 static final byte[] r2 = Bytes.toBytes("row-2"); 080 081 @BeforeClass 082 public static void setupBeforeClass() throws Exception { 083 /** 084 * Use {@link DeallocateRewriteByteBuffAllocator} to rewrite the bytebuffers right after 085 * released. 086 */ 087 conf.set(ByteBuffAllocator.BYTEBUFF_ALLOCATOR_CLASS, 088 DeallocateRewriteByteBuffAllocator.class.getName()); 089 conf.setBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); 090 conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0); 091 conf.setInt(BlockCacheFactory.BUCKET_CACHE_WRITER_THREADS_KEY, 20); 092 conf.setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 2048); 093 conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); 094 conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 64); 095 conf.setStrings(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName()); 096 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 60 * 1000); 097 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30 * 60 * 1000); 098 099 conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60 * 60 * 1000); 100 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 101 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10000); 102 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 1024 * 1024 * 1024); 103 TEST_UTIL.startMiniCluster(1); 104 admin = TEST_UTIL.getAdmin(); 105 } 106 107 @AfterClass 108 public static void tearDownAfterClass() throws Exception { 109 TEST_UTIL.shutdownMiniCluster(); 110 } 111 112 @Test 113 public void testScannWhenRpcCallContextNull() throws Exception { 114 ResultScanner resultScanner = null; 115 Table table = null; 116 try { 117 table = TEST_UTIL.createTable(tableName, new byte[][] { CF }, 1, 1024, null); 118 putToTable(table, r0); 119 putToTable(table, r1); 120 putToTable(table, r2); 121 122 admin.flush(table.getName()); 123 124 Scan scan = new Scan(); 125 scan.setCaching(2); 126 scan.withStartRow(r0, true).withStopRow(r2, true); 127 128 MyRSRpcServices.inTest = true; 129 resultScanner = table.getScanner(scan); 130 Result result = resultScanner.next(); 131 byte[] rowKey = result.getRow(); 132 assertTrue(Bytes.equals(r0, rowKey)); 133 134 result = resultScanner.next(); 135 rowKey = result.getRow(); 136 assertTrue(Bytes.equals(r1, rowKey)); 137 138 result = resultScanner.next(); 139 rowKey = result.getRow(); 140 assertTrue(Bytes.equals(r2, rowKey)); 141 assertNull(resultScanner.next()); 142 assertTrue(MyRSRpcServices.exceptionRef.get() == null); 143 } finally { 144 MyRSRpcServices.inTest = false; 145 if (resultScanner != null) { 146 resultScanner.close(); 147 } 148 if (table != null) { 149 table.close(); 150 } 151 } 152 } 153 154 private static void putToTable(Table table, byte[] rowkey) throws IOException { 155 Put put = new Put(rowkey); 156 put.addColumn(CF, CQ, VALUE); 157 table.put(put); 158 } 159 160 private static class MyRegionServer extends MiniHBaseClusterRegionServer { 161 public MyRegionServer(Configuration conf) throws IOException, InterruptedException { 162 super(conf); 163 } 164 165 @Override 166 protected RSRpcServices createRpcServices() throws IOException { 167 return new MyRSRpcServices(this); 168 } 169 } 170 171 private static class MyRSRpcServices extends RSRpcServices { 172 private static AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(null); 173 private static volatile boolean inTest = false; 174 175 public MyRSRpcServices(HRegionServer rs) throws IOException { 176 super(rs); 177 } 178 179 @Override 180 public ScanResponse scan(RpcController controller, ScanRequest request) 181 throws ServiceException { 182 try { 183 if (!inTest) { 184 return super.scan(controller, request); 185 } 186 187 HRegion region = null; 188 if (request.hasRegion()) { 189 region = this.getRegion(request.getRegion()); 190 } 191 192 if (region != null && !tableName.equals(region.getTableDescriptor().getTableName())) { 193 return super.scan(controller, request); 194 } 195 196 ScanResponse result = null; 197 // Simulate RpcCallContext is null for test. 198 Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall(); 199 try { 200 result = super.scan(controller, request); 201 } finally { 202 rpcCall.ifPresent(RpcServer::setCurrentCall); 203 } 204 return result; 205 } catch (Throwable e) { 206 exceptionRef.set(e); 207 throw new ServiceException(e); 208 } 209 } 210 } 211 212}