001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotEquals; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.util.Arrays; 028import java.util.HashMap; 029import java.util.Map; 030import java.util.TreeMap; 031import java.util.concurrent.BrokenBarrierException; 032import java.util.concurrent.CyclicBarrier; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicLong; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseConfiguration; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HRegionLocation; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.RequestController.ReturnCode; 043import org.apache.hadoop.hbase.testclassification.ClientTests; 044import org.apache.hadoop.hbase.testclassification.SmallTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.junit.Assert; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050 051@Category({ ClientTests.class, SmallTests.class }) 052public class TestSimpleRequestController { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestSimpleRequestController.class); 057 058 private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE"); 059 private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1"); 060 private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2"); 061 private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3"); 062 private static final ServerName SN = ServerName.valueOf("s1,1,1"); 063 private static final ServerName SN2 = ServerName.valueOf("s2,2,2"); 064 private static final RegionInfo HRI1 = RegionInfoBuilder.newBuilder(DUMMY_TABLE) 065 .setStartKey(DUMMY_BYTES_1).setEndKey(DUMMY_BYTES_2).setRegionId(1).build(); 066 private static final RegionInfo HRI2 = RegionInfoBuilder.newBuilder(DUMMY_TABLE) 067 .setStartKey(DUMMY_BYTES_2).setEndKey(HConstants.EMPTY_END_ROW).setRegionId(2).build(); 068 private static final RegionInfo HRI3 = RegionInfoBuilder.newBuilder(DUMMY_TABLE) 069 .setStartKey(DUMMY_BYTES_3).setEndKey(HConstants.EMPTY_END_ROW).setRegionId(3).build(); 070 private static final HRegionLocation LOC1 = new HRegionLocation(HRI1, SN); 071 private static final HRegionLocation LOC2 = new HRegionLocation(HRI2, SN); 072 private static final HRegionLocation LOC3 = new HRegionLocation(HRI3, SN2); 073 074 @Test 075 public void testIllegalRequestHeapSize() { 076 testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1); 077 } 078 079 @Test 080 public void testIllegalRsTasks() { 081 testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, -1); 082 } 083 084 @Test 085 public void testIllegalRegionTasks() { 086 testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, -1); 087 } 088 089 @Test 090 public void testIllegalSubmittedSize() { 091 testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, -1); 092 } 093 094 @Test 095 public void testIllegalRequestRows() { 096 testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_ROWS, -1); 097 } 098 099 private void testIllegalArgument(String key, long value) { 100 Configuration conf = HBaseConfiguration.create(); 101 conf.setLong(key, value); 102 try { 103 new SimpleRequestController(conf); 104 fail("The " + key + " must be bigger than zero"); 105 } catch (IllegalArgumentException e) { 106 // Expected 107 } 108 } 109 110 private static Put createPut(long maxHeapSizePerRequest) { 111 return new Put(Bytes.toBytes("row")) { 112 @Override 113 public long heapSize() { 114 return maxHeapSizePerRequest; 115 } 116 }; 117 } 118 119 @Test 120 public void testTaskCheckerHost() throws IOException { 121 final int maxTotalConcurrentTasks = 100; 122 final int maxConcurrentTasksPerServer = 2; 123 final int maxConcurrentTasksPerRegion = 1; 124 final AtomicLong tasksInProgress = new AtomicLong(0); 125 final Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>(); 126 final Map<byte[], AtomicInteger> taskCounterPerRegion = new TreeMap<>(Bytes.BYTES_COMPARATOR); 127 SimpleRequestController.TaskCountChecker countChecker = 128 new SimpleRequestController.TaskCountChecker(maxTotalConcurrentTasks, 129 maxConcurrentTasksPerServer, maxConcurrentTasksPerRegion, tasksInProgress, 130 taskCounterPerServer, taskCounterPerRegion); 131 final long maxHeapSizePerRequest = 2 * 1024 * 1024; 132 // unlimiited 133 SimpleRequestController.RequestHeapSizeChecker sizeChecker = 134 new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest); 135 RequestController.Checker checker = 136 SimpleRequestController.newChecker(Arrays.asList(countChecker, sizeChecker)); 137 ReturnCode loc1Code = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest)); 138 assertEquals(ReturnCode.INCLUDE, loc1Code); 139 140 ReturnCode loc1Code_2 = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest)); 141 // rejected for size 142 assertNotEquals(ReturnCode.INCLUDE, loc1Code_2); 143 144 ReturnCode loc2Code = checker.canTakeRow(LOC2, createPut(maxHeapSizePerRequest)); 145 // rejected for size 146 assertNotEquals(ReturnCode.INCLUDE, loc2Code); 147 148 // fill the task slots for LOC3. 149 taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(100)); 150 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100)); 151 152 ReturnCode loc3Code = checker.canTakeRow(LOC3, createPut(1L)); 153 // rejected for count 154 assertNotEquals(ReturnCode.INCLUDE, loc3Code); 155 156 // release the task slots for LOC3. 157 taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(0)); 158 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0)); 159 160 ReturnCode loc3Code_2 = checker.canTakeRow(LOC3, createPut(1L)); 161 assertEquals(ReturnCode.INCLUDE, loc3Code_2); 162 } 163 164 @Test 165 public void testRequestHeapSizeChecker() throws IOException { 166 final long maxHeapSizePerRequest = 2 * 1024 * 1024; 167 SimpleRequestController.RequestHeapSizeChecker checker = 168 new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest); 169 170 // inner state is unchanged. 171 for (int i = 0; i != 10; ++i) { 172 ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); 173 assertEquals(ReturnCode.INCLUDE, code); 174 code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest); 175 assertEquals(ReturnCode.INCLUDE, code); 176 } 177 178 // accept the data located on LOC1 region. 179 ReturnCode acceptCode = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); 180 assertEquals(ReturnCode.INCLUDE, acceptCode); 181 checker.notifyFinal(acceptCode, LOC1, maxHeapSizePerRequest); 182 183 // the sn server reachs the limit. 184 for (int i = 0; i != 10; ++i) { 185 ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); 186 assertNotEquals(ReturnCode.INCLUDE, code); 187 code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest); 188 assertNotEquals(ReturnCode.INCLUDE, code); 189 } 190 191 // the request to sn2 server should be accepted. 192 for (int i = 0; i != 10; ++i) { 193 ReturnCode code = checker.canTakeOperation(LOC3, maxHeapSizePerRequest); 194 assertEquals(ReturnCode.INCLUDE, code); 195 } 196 197 checker.reset(); 198 for (int i = 0; i != 10; ++i) { 199 ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest); 200 assertEquals(ReturnCode.INCLUDE, code); 201 code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest); 202 assertEquals(ReturnCode.INCLUDE, code); 203 } 204 } 205 206 @Test 207 public void testRequestRowsChecker() throws IOException { 208 final long maxRowCount = 100; 209 SimpleRequestController.RequestRowsChecker checker = 210 new SimpleRequestController.RequestRowsChecker(maxRowCount); 211 212 final long heapSizeOfRow = 100; // unused 213 // inner state is unchanged. 214 for (int i = 0; i != 10; ++i) { 215 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 216 assertEquals(ReturnCode.INCLUDE, code); 217 code = checker.canTakeOperation(LOC2, heapSizeOfRow); 218 assertEquals(ReturnCode.INCLUDE, code); 219 } 220 221 // accept the data located on LOC1 region. 222 for (int i = 0; i != maxRowCount; ++i) { 223 ReturnCode acceptCode = checker.canTakeOperation(LOC1, heapSizeOfRow); 224 assertEquals(ReturnCode.INCLUDE, acceptCode); 225 checker.notifyFinal(acceptCode, LOC1, heapSizeOfRow); 226 } 227 228 // the sn server reachs the limit. 229 for (int i = 0; i != 10; ++i) { 230 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 231 assertNotEquals(ReturnCode.INCLUDE, code); 232 code = checker.canTakeOperation(LOC2, heapSizeOfRow); 233 assertNotEquals(ReturnCode.INCLUDE, code); 234 } 235 236 // the request to sn2 server should be accepted. 237 for (int i = 0; i != 10; ++i) { 238 ReturnCode code = checker.canTakeOperation(LOC3, heapSizeOfRow); 239 assertEquals(ReturnCode.INCLUDE, code); 240 } 241 242 checker.reset(); 243 for (int i = 0; i != 10; ++i) { 244 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 245 assertEquals(ReturnCode.INCLUDE, code); 246 code = checker.canTakeOperation(LOC2, heapSizeOfRow); 247 assertEquals(ReturnCode.INCLUDE, code); 248 } 249 } 250 251 @Test 252 public void testSubmittedSizeChecker() { 253 final long maxHeapSizeSubmit = 2 * 1024 * 1024; 254 SimpleRequestController.SubmittedSizeChecker checker = 255 new SimpleRequestController.SubmittedSizeChecker(maxHeapSizeSubmit); 256 257 for (int i = 0; i != 10; ++i) { 258 ReturnCode include = checker.canTakeOperation(LOC1, 100000); 259 assertEquals(ReturnCode.INCLUDE, include); 260 } 261 262 for (int i = 0; i != 10; ++i) { 263 checker.notifyFinal(ReturnCode.INCLUDE, LOC1, maxHeapSizeSubmit); 264 } 265 266 for (int i = 0; i != 10; ++i) { 267 ReturnCode include = checker.canTakeOperation(LOC1, 100000); 268 assertEquals(ReturnCode.END, include); 269 } 270 for (int i = 0; i != 10; ++i) { 271 ReturnCode include = checker.canTakeOperation(LOC2, 100000); 272 assertEquals(ReturnCode.END, include); 273 } 274 checker.reset(); 275 for (int i = 0; i != 10; ++i) { 276 ReturnCode include = checker.canTakeOperation(LOC1, 100000); 277 assertEquals(ReturnCode.INCLUDE, include); 278 } 279 } 280 281 @Test 282 public void testTaskCountChecker() throws InterruptedIOException { 283 long heapSizeOfRow = 12345; 284 int maxTotalConcurrentTasks = 100; 285 int maxConcurrentTasksPerServer = 2; 286 int maxConcurrentTasksPerRegion = 1; 287 AtomicLong tasksInProgress = new AtomicLong(0); 288 Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>(); 289 Map<byte[], AtomicInteger> taskCounterPerRegion = new TreeMap<>(Bytes.BYTES_COMPARATOR); 290 SimpleRequestController.TaskCountChecker checker = new SimpleRequestController.TaskCountChecker( 291 maxTotalConcurrentTasks, maxConcurrentTasksPerServer, maxConcurrentTasksPerRegion, 292 tasksInProgress, taskCounterPerServer, taskCounterPerRegion); 293 294 // inner state is unchanged. 295 for (int i = 0; i != 10; ++i) { 296 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 297 assertEquals(ReturnCode.INCLUDE, code); 298 } 299 // add LOC1 region. 300 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow); 301 assertEquals(ReturnCode.INCLUDE, code); 302 checker.notifyFinal(code, LOC1, heapSizeOfRow); 303 304 // fill the task slots for LOC1. 305 taskCounterPerRegion.put(LOC1.getRegion().getRegionName(), new AtomicInteger(100)); 306 taskCounterPerServer.put(LOC1.getServerName(), new AtomicInteger(100)); 307 308 // the region was previously accepted, so it must be accpted now. 309 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { 310 ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow); 311 assertEquals(ReturnCode.INCLUDE, includeCode); 312 checker.notifyFinal(includeCode, LOC1, heapSizeOfRow); 313 } 314 315 // fill the task slots for LOC3. 316 taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(100)); 317 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100)); 318 319 // no task slots. 320 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { 321 ReturnCode excludeCode = checker.canTakeOperation(LOC3, heapSizeOfRow); 322 assertNotEquals(ReturnCode.INCLUDE, excludeCode); 323 checker.notifyFinal(excludeCode, LOC3, heapSizeOfRow); 324 } 325 326 // release the tasks for LOC3. 327 taskCounterPerRegion.put(LOC3.getRegion().getRegionName(), new AtomicInteger(0)); 328 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0)); 329 330 // add LOC3 region. 331 ReturnCode code3 = checker.canTakeOperation(LOC3, heapSizeOfRow); 332 assertEquals(ReturnCode.INCLUDE, code3); 333 checker.notifyFinal(code3, LOC3, heapSizeOfRow); 334 335 // the region was previously accepted, so it must be accpted now. 336 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { 337 ReturnCode includeCode = checker.canTakeOperation(LOC3, heapSizeOfRow); 338 assertEquals(ReturnCode.INCLUDE, includeCode); 339 checker.notifyFinal(includeCode, LOC3, heapSizeOfRow); 340 } 341 342 checker.reset(); 343 // the region was previously accepted, 344 // but checker have reseted and task slots for LOC1 is full. 345 // So it must be rejected now. 346 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { 347 ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow); 348 assertNotEquals(ReturnCode.INCLUDE, includeCode); 349 checker.notifyFinal(includeCode, LOC1, heapSizeOfRow); 350 } 351 } 352 353 @Test 354 public void testWaitForMaximumCurrentTasks() throws Exception { 355 final AtomicInteger max = new AtomicInteger(0); 356 final CyclicBarrier barrier = new CyclicBarrier(2); 357 SimpleRequestController controller = new SimpleRequestController(HBaseConfiguration.create()); 358 final AtomicLong tasks = controller.tasksInProgress; 359 Runnable runnable = () -> { 360 try { 361 barrier.await(); 362 controller.waitForMaximumCurrentTasks(max.get(), 123, 1, null); 363 } catch (InterruptedIOException | InterruptedException | BrokenBarrierException e) { 364 Assert.fail(e.getMessage()); 365 } 366 }; 367 // First test that our runnable thread only exits when tasks is zero. 368 Thread t = new Thread(runnable); 369 t.start(); 370 barrier.await(); 371 t.join(); 372 // Now assert we stay running if max == zero and tasks is > 0. 373 barrier.reset(); 374 tasks.set(1000000); 375 t = new Thread(runnable); 376 t.start(); 377 barrier.await(); 378 while (tasks.get() > 0) { 379 assertTrue(t.isAlive()); 380 tasks.set(tasks.get() - 1); 381 } 382 t.join(); 383 } 384}