001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Random; 026import java.util.concurrent.atomic.AtomicInteger; 027import java.util.concurrent.atomic.AtomicReference; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.client.Append; 030import org.apache.hadoop.hbase.client.CheckAndMutate; 031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.client.Delete; 035import org.apache.hadoop.hbase.client.Get; 036import org.apache.hadoop.hbase.client.Increment; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.ResultScanner; 039import org.apache.hadoop.hbase.client.RowMutations; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.client.TableDescriptor; 043import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 044import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController; 045import org.apache.hadoop.hbase.ipc.HBaseRpcController; 046import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 047import org.apache.hadoop.hbase.ipc.RpcServer; 048import org.apache.hadoop.hbase.regionserver.HRegionServer; 049import org.apache.hadoop.hbase.regionserver.RSRpcServices; 050import org.apache.hadoop.hbase.testclassification.ClientTests; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057 058import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 059import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 060 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 062 063/** 064 * Tests that one can implement their own RpcControllerFactory and expect it to successfully pass 065 * custom priority values to the server for all HTable calls. 066 */ 067@Category({ ClientTests.class, MediumTests.class }) 068public class TestCustomPriorityRpcControllerFactory { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestCustomPriorityRpcControllerFactory.class); 073 074 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 075 076 private static final AtomicReference<State> STATE = new AtomicReference<>(State.SETUP); 077 private static final AtomicInteger EXPECTED_PRIORITY = new AtomicInteger(); 078 079 private enum State { 080 SETUP, 081 WAITING, 082 SUCCESS 083 } 084 085 private static final TableName TABLE_NAME = TableName.valueOf("Timeout"); 086 private static final byte[] FAMILY = Bytes.toBytes("family"); 087 private static final byte[] ROW = Bytes.toBytes("row"); 088 private static final byte[] QUALIFIER = Bytes.toBytes("qualifier"); 089 private static final byte[] VALUE = Bytes.toBytes(1L); 090 091 private static final int MIN_CUSTOM_PRIORITY = 201; 092 093 private static Connection CONN; 094 private static Table TABLE; 095 096 @BeforeClass 097 public static void setUpClass() throws Exception { 098 // Set RegionServer class and use default values for other options. 099 UTIL.startMiniCluster( 100 StartTestingClusterOption.builder().rsClass(PriorityRegionServer.class).build()); 101 TableDescriptor descriptor = TableDescriptorBuilder.newBuilder(TABLE_NAME) 102 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); 103 UTIL.getAdmin().createTable(descriptor); 104 105 Configuration conf = new Configuration(UTIL.getConfiguration()); 106 conf.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, 107 PriorityRpcControllerFactory.class, RpcControllerFactory.class); 108 CONN = ConnectionFactory.createConnection(conf); 109 TABLE = CONN.getTable(TABLE_NAME); 110 } 111 112 @Test 113 public void tetGetPriority() throws Exception { 114 testForCall(new ThrowingCallable() { 115 @Override 116 public void call() throws IOException { 117 TABLE.get(new Get(ROW)); 118 } 119 }); 120 } 121 122 @Test 123 public void testDeletePriority() throws Exception { 124 testForCall(new ThrowingCallable() { 125 @Override 126 public void call() throws IOException { 127 TABLE.delete(new Delete(ROW)); 128 } 129 }); 130 } 131 132 @Test 133 public void testIncrementPriority() throws Exception { 134 testForCall(new ThrowingCallable() { 135 @Override 136 public void call() throws IOException { 137 TABLE.increment(new Increment(ROW).addColumn(FAMILY, QUALIFIER, 1)); 138 } 139 }); 140 } 141 142 @Test 143 public void testAppendPriority() throws Exception { 144 testForCall(new ThrowingCallable() { 145 @Override 146 public void call() throws IOException { 147 TABLE.append(new Append(ROW).addColumn(FAMILY, QUALIFIER, VALUE)); 148 } 149 }); 150 } 151 152 @Test 153 public void testPutPriority() throws Exception { 154 testForCall(new ThrowingCallable() { 155 @Override 156 public void call() throws IOException { 157 Put put = new Put(ROW); 158 put.addColumn(FAMILY, QUALIFIER, VALUE); 159 TABLE.put(put); 160 } 161 }); 162 163 } 164 165 @Test 166 public void testExistsPriority() throws Exception { 167 testForCall(new ThrowingCallable() { 168 @Override 169 public void call() throws IOException { 170 TABLE.exists(new Get(ROW)); 171 } 172 }); 173 } 174 175 @Test 176 public void testMutatePriority() throws Exception { 177 testForCall(new ThrowingCallable() { 178 @Override 179 public void call() throws IOException { 180 RowMutations mutation = new RowMutations(ROW); 181 mutation.add(new Delete(ROW)); 182 mutation.add(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)); 183 TABLE.mutateRow(mutation); 184 } 185 }); 186 } 187 188 @Test 189 public void testCheckAndMutatePriority() throws Exception { 190 testForCall(new ThrowingCallable() { 191 @Override 192 public void call() throws IOException { 193 RowMutations mutation = new RowMutations(ROW); 194 mutation.add(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)); 195 TABLE.checkAndMutate( 196 CheckAndMutate.newBuilder(ROW).ifNotExists(FAMILY, QUALIFIER).build(mutation)); 197 } 198 }); 199 } 200 201 @Test 202 public void testMultiGetsPriority() throws Exception { 203 testForCall(new ThrowingCallable() { 204 @Override 205 public void call() throws Exception { 206 Get get1 = new Get(ROW); 207 get1.addColumn(FAMILY, QUALIFIER); 208 Get get2 = new Get(ROW); 209 get2.addColumn(FAMILY, QUALIFIER); 210 List<Get> gets = new ArrayList<>(); 211 gets.add(get1); 212 gets.add(get2); 213 TABLE.batch(gets, new Object[2]); 214 } 215 }); 216 } 217 218 @Test 219 public void testMultiPutsPriority() throws Exception { 220 testForCall(new ThrowingCallable() { 221 @Override 222 public void call() throws Exception { 223 Put put1 = new Put(ROW); 224 put1.addColumn(FAMILY, QUALIFIER, VALUE); 225 Put put2 = new Put(ROW); 226 put2.addColumn(FAMILY, QUALIFIER, VALUE); 227 List<Put> puts = new ArrayList<>(); 228 puts.add(put1); 229 puts.add(put2); 230 TABLE.batch(puts, new Object[2]); 231 } 232 }); 233 } 234 235 @Test 236 public void testScanPriority() throws Exception { 237 testForCall(new ThrowingCallable() { 238 @Override 239 public void call() throws IOException { 240 ResultScanner scanner = TABLE.getScanner(new Scan()); 241 scanner.next(); 242 } 243 }); 244 } 245 246 private void testForCall(ThrowingCallable callable) throws Exception { 247 STATE.set(State.WAITING); 248 // set it higher than MIN_CUSTOM_PRIORITY so we can ignore calls for meta, setup, etc 249 EXPECTED_PRIORITY.set(new Random().nextInt(MIN_CUSTOM_PRIORITY) + MIN_CUSTOM_PRIORITY); 250 callable.call(); 251 252 assertEquals("Expected state to change to SUCCESS. Check for assertion error in logs", 253 STATE.get(), State.SUCCESS); 254 } 255 256 private interface ThrowingCallable { 257 void call() throws Exception; 258 } 259 260 public static class PriorityRegionServer 261 extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer { 262 public PriorityRegionServer(Configuration conf) throws IOException, InterruptedException { 263 super(conf); 264 } 265 266 @Override 267 protected RSRpcServices createRpcServices() throws IOException { 268 return new PriorityRpcServices(this); 269 } 270 } 271 272 public static class PriorityRpcControllerFactory extends RpcControllerFactory { 273 274 public PriorityRpcControllerFactory(Configuration conf) { 275 super(conf); 276 } 277 278 @Override 279 public HBaseRpcController newController() { 280 return new PriorityController(EXPECTED_PRIORITY.get(), super.newController()); 281 } 282 283 @Override 284 public HBaseRpcController newController(ExtendedCellScanner cellScanner) { 285 return new PriorityController(EXPECTED_PRIORITY.get(), super.newController(cellScanner)); 286 } 287 288 @Override 289 public HBaseRpcController newController(List<ExtendedCellScannable> cellIterables) { 290 return new PriorityController(EXPECTED_PRIORITY.get(), super.newController(cellIterables)); 291 } 292 } 293 294 private static class PriorityController extends DelegatingHBaseRpcController { 295 private final int priority; 296 297 public PriorityController(int priority, HBaseRpcController controller) { 298 super(controller); 299 this.priority = priority; 300 } 301 302 @Override 303 public int getPriority() { 304 return priority; 305 } 306 } 307 308 public static class PriorityRpcServices extends RSRpcServices { 309 PriorityRpcServices(HRegionServer rs) throws IOException { 310 super(rs); 311 } 312 313 private void checkPriorityIfWaiting() { 314 if (STATE.get() == State.WAITING) { 315 int priority = RpcServer.getCurrentCall().get().getPriority(); 316 if (priority < MIN_CUSTOM_PRIORITY) { 317 return; 318 } 319 assertEquals(EXPECTED_PRIORITY.get(), priority); 320 STATE.set(State.SUCCESS); 321 } 322 } 323 324 @Override 325 public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request) 326 throws ServiceException { 327 checkPriorityIfWaiting(); 328 return super.get(controller, request); 329 } 330 331 @Override 332 public ClientProtos.MutateResponse mutate(RpcController rpcc, 333 ClientProtos.MutateRequest request) throws ServiceException { 334 checkPriorityIfWaiting(); 335 return super.mutate(rpcc, request); 336 } 337 338 @Override 339 public ClientProtos.ScanResponse scan(RpcController controller, 340 ClientProtos.ScanRequest request) throws ServiceException { 341 checkPriorityIfWaiting(); 342 return super.scan(controller, request); 343 } 344 345 @Override 346 public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request) 347 throws ServiceException { 348 checkPriorityIfWaiting(); 349 return super.multi(rpcc, request); 350 } 351 } 352}