001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; 021import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS; 022import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS; 023import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertNotNull; 027import static org.junit.Assert.assertTrue; 028import static org.mockito.ArgumentMatchers.any; 029import static org.mockito.Mockito.argThat; 030import static org.mockito.Mockito.atLeast; 031import static org.mockito.Mockito.doAnswer; 032import static org.mockito.Mockito.mock; 033import static org.mockito.Mockito.times; 034import static org.mockito.Mockito.verify; 035 036import java.io.IOException; 037import java.util.Arrays; 038import java.util.Optional; 039import java.util.concurrent.ExecutorService; 040import java.util.concurrent.Executors; 041import java.util.concurrent.atomic.AtomicInteger; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.hbase.Cell; 044import org.apache.hadoop.hbase.CellBuilderFactory; 045import org.apache.hadoop.hbase.CellBuilderType; 046import org.apache.hadoop.hbase.HBaseClassTestRule; 047import org.apache.hadoop.hbase.HBaseConfiguration; 048import org.apache.hadoop.hbase.HRegionLocation; 049import org.apache.hadoop.hbase.RegionLocations; 050import org.apache.hadoop.hbase.ServerName; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.ipc.HBaseRpcController; 053import org.apache.hadoop.hbase.security.User; 054import org.apache.hadoop.hbase.security.UserProvider; 055import org.apache.hadoop.hbase.testclassification.ClientTests; 056import org.apache.hadoop.hbase.testclassification.MediumTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.junit.Before; 059import org.junit.ClassRule; 060import org.junit.Rule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.junit.rules.TestName; 064import org.mockito.ArgumentMatcher; 065import org.mockito.invocation.InvocationOnMock; 066import org.mockito.stubbing.Answer; 067 068import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 069 070import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 072 073/** 074 * Test that correct rpc priority is sent to server from blocking Table calls. Currently only 075 * implements checks for scans, but more could be added here. 076 */ 077@Category({ ClientTests.class, MediumTests.class }) 078public class TestTableRpcPriority { 079 080 @ClassRule 081 public static final HBaseClassTestRule CLASS_RULE = 082 HBaseClassTestRule.forClass(TestTableRpcPriority.class); 083 084 @Rule 085 public TestName name = new TestName(); 086 087 private ClientProtos.ClientService.BlockingInterface stub; 088 private Connection conn; 089 090 @Before 091 public void setUp() throws IOException, ServiceException { 092 stub = mock(ClientProtos.ClientService.BlockingInterface.class); 093 094 Configuration conf = HBaseConfiguration.create(); 095 096 ExecutorService executorService = Executors.newCachedThreadPool(); 097 User user = UserProvider.instantiate(conf).getCurrent(); 098 conn = new ConnectionImplementation(conf, executorService, user, 099 new DoNothingConnectionRegistry(conf, user)) { 100 101 @Override 102 public ClientProtos.ClientService.BlockingInterface getClient(ServerName serverName) 103 throws IOException { 104 return stub; 105 } 106 107 @Override 108 public RegionLocations relocateRegion(final TableName tableName, final byte[] row, 109 int replicaId) throws IOException { 110 return locateRegion(tableName, row, true, false, replicaId); 111 } 112 113 @Override 114 public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, 115 boolean retry, int replicaId) throws IOException { 116 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 117 ServerName serverName = ServerName.valueOf("rs", 16010, 12345); 118 HRegionLocation loc = new HRegionLocation(info, serverName); 119 return new RegionLocations(loc); 120 } 121 }; 122 } 123 124 @Test 125 public void testScan() throws Exception { 126 mockScan(19); 127 testForTable(TableName.valueOf(name.getMethodName()), Optional.of(19)); 128 } 129 130 /** 131 * This test verifies that our closeScanner request honors the original priority of the scan if 132 * it's greater than our expected HIGH_QOS for close calls. 133 */ 134 @Test 135 public void testScanSuperHighPriority() throws Exception { 136 mockScan(1000); 137 testForTable(TableName.valueOf(name.getMethodName()), Optional.of(1000)); 138 } 139 140 @Test 141 public void testScanNormalTable() throws Exception { 142 mockScan(NORMAL_QOS); 143 testForTable(TableName.valueOf(name.getMethodName()), Optional.of(NORMAL_QOS)); 144 } 145 146 @Test 147 public void testScanSystemTable() throws Exception { 148 mockScan(SYSTEMTABLE_QOS); 149 testForTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()), 150 Optional.empty()); 151 } 152 153 @Test 154 public void testScanMetaTable() throws Exception { 155 mockScan(SYSTEMTABLE_QOS); 156 testForTable(TableName.META_TABLE_NAME, Optional.empty()); 157 } 158 159 private void testForTable(TableName tableName, Optional<Integer> priority) throws Exception { 160 Scan scan = new Scan().setCaching(1); 161 priority.ifPresent(scan::setPriority); 162 163 try (ResultScanner scanner = conn.getTable(tableName).getScanner(scan)) { 164 assertNotNull(scanner.next()); 165 assertNotNull(scanner.next()); 166 } 167 168 // just verify that the calls happened. verification of priority occurred in the mocking 169 // open, next, then several renew lease 170 verify(stub, atLeast(3)).scan(any(), any(ClientProtos.ScanRequest.class)); 171 verify(stub, times(1)).scan(assertControllerArgs(Math.max(priority.orElse(0), HIGH_QOS)), 172 assertScannerCloseRequest()); 173 } 174 175 private void mockScan(int scanPriority) throws ServiceException { 176 int scannerId = 1; 177 178 doAnswer(new Answer<ClientProtos.ScanResponse>() { 179 @Override 180 public ClientProtos.ScanResponse answer(InvocationOnMock invocation) throws Throwable { 181 throw new IllegalArgumentException( 182 "Call not covered by explicit mock for arguments controller=" + invocation.getArgument(0) 183 + ", request=" + invocation.getArgument(1)); 184 } 185 }).when(stub).scan(any(), any()); 186 187 AtomicInteger scanNextCalled = new AtomicInteger(0); 188 doAnswer(new Answer<ClientProtos.ScanResponse>() { 189 190 @Override 191 public ClientProtos.ScanResponse answer(InvocationOnMock invocation) throws Throwable { 192 ClientProtos.ScanRequest req = invocation.getArgument(1); 193 assertFalse("close scanner should not come in with scan priority " + scanPriority, 194 req.hasCloseScanner() && req.getCloseScanner()); 195 ClientProtos.ScanResponse.Builder builder = ClientProtos.ScanResponse.newBuilder(); 196 197 if (!req.hasScannerId()) { 198 builder.setScannerId(scannerId); 199 } else { 200 builder.setScannerId(req.getScannerId()); 201 } 202 203 Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) 204 .setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())).setFamily(Bytes.toBytes("cf")) 205 .setQualifier(Bytes.toBytes("cq")).setValue(Bytes.toBytes("v")).build(); 206 Result result = Result.create(Arrays.asList(cell)); 207 return builder.setTtl(800).setMoreResultsInRegion(true).setMoreResults(true) 208 .addResults(ProtobufUtil.toResult(result)).build(); 209 } 210 }).when(stub).scan(assertControllerArgs(scanPriority), any()); 211 212 doAnswer(new Answer<ClientProtos.ScanResponse>() { 213 214 @Override 215 public ClientProtos.ScanResponse answer(InvocationOnMock invocation) throws Throwable { 216 ClientProtos.ScanRequest req = invocation.getArgument(1); 217 assertTrue("close request should have scannerId", req.hasScannerId()); 218 assertEquals("close request's scannerId should match", scannerId, req.getScannerId()); 219 assertTrue("close request should have closerScanner set", 220 req.hasCloseScanner() && req.getCloseScanner()); 221 222 return ClientProtos.ScanResponse.getDefaultInstance(); 223 } 224 }).when(stub).scan(assertControllerArgs(Math.max(scanPriority, HIGH_QOS)), 225 assertScannerCloseRequest()); 226 } 227 228 private HBaseRpcController assertControllerArgs(int priority) { 229 return argThat(new ArgumentMatcher<HBaseRpcController>() { 230 231 @Override 232 public boolean matches(HBaseRpcController controller) { 233 // check specified priority, but also check that it has a timeout 234 // this ensures that our conversion from the base controller to the close-specific 235 // controller honored the original arguments. 236 return controller.getPriority() == priority && controller.hasCallTimeout(); 237 } 238 }); 239 } 240 241 private ClientProtos.ScanRequest assertScannerCloseRequest() { 242 return argThat(new ArgumentMatcher<ClientProtos.ScanRequest>() { 243 244 @Override 245 public boolean matches(ClientProtos.ScanRequest request) { 246 return request.hasCloseScanner() && request.getCloseScanner(); 247 } 248 }); 249 } 250}