001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PREPARE_PUT_STORE_MULTIPLIER; 021import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT; 022import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.when; 025 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicReference; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.RegionTooBusyException; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.regionserver.Region; 040import org.apache.hadoop.hbase.regionserver.Store; 041import org.apache.hadoop.hbase.testclassification.SmallTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.junit.Assert; 044import org.junit.ClassRule; 045import org.junit.Test; 046import org.junit.experimental.categories.Category; 047 048import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 049 050@Category(SmallTests.class) 051public class TestStoreHotnessProtector { 052 053 @ClassRule 054 public static final HBaseClassTestRule CLASS_RULE = 055 HBaseClassTestRule.forClass(TestStoreHotnessProtector.class); 056 057 @Test 058 public void testPreparePutCounter() throws Exception { 059 060 ExecutorService executorService = Executors.newFixedThreadPool(10); 061 062 Configuration conf = new Configuration(); 063 conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 0); 064 conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10); 065 conf.setInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3); 066 Region mockRegion = mock(Region.class); 067 StoreHotnessProtector storeHotnessProtector = new StoreHotnessProtector(mockRegion, conf); 068 069 Store mockStore1 = mock(Store.class); 070 RegionInfo mockRegionInfo = mock(RegionInfo.class); 071 byte[] family = Bytes.toBytes("testF1"); 072 073 when(mockRegion.getStore(family)).thenReturn(mockStore1); 074 when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); 075 when(mockRegionInfo.getRegionNameAsString()).thenReturn("test_region_1"); 076 077 when(mockStore1.getCurrentParallelPutCount()).thenReturn(1); 078 when(mockStore1.getColumnFamilyName()).thenReturn("test_Family_1"); 079 080 final Map<byte[], List<Cell>> familyMaps = new HashMap<>(); 081 familyMaps.put(family, Lists.newArrayList(mock(Cell.class), mock(Cell.class))); 082 083 final AtomicReference<Exception> exception = new AtomicReference<>(); 084 085 // PreparePutCounter not access limit 086 087 int threadCount = conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10) 088 * conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3); 089 CountDownLatch countDownLatch = new CountDownLatch(threadCount); 090 091 for (int i = 0; i < threadCount; i++) { 092 executorService.execute(() -> { 093 try { 094 storeHotnessProtector.start(familyMaps); 095 } catch (RegionTooBusyException e) { 096 e.printStackTrace(); 097 exception.set(e); 098 } finally { 099 countDownLatch.countDown(); 100 } 101 }); 102 } 103 104 countDownLatch.await(60, TimeUnit.SECONDS); 105 // no exception 106 Assert.assertEquals(exception.get(), null); 107 Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1); 108 Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), 109 threadCount); 110 111 // access limit 112 113 try { 114 storeHotnessProtector.start(familyMaps); 115 } catch (RegionTooBusyException e) { 116 e.printStackTrace(); 117 exception.set(e); 118 } 119 120 Assert.assertEquals(exception.get().getClass(), RegionTooBusyException.class); 121 122 Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1); 123 // when access limit, counter will not changed. 124 Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), 125 threadCount + 1); 126 127 storeHotnessProtector.finish(familyMaps); 128 Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), 129 threadCount); 130 } 131 132}