001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.ArgumentMatchers.any; 024import static org.mockito.Mockito.doAnswer; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.util.Arrays; 030import java.util.List; 031import java.util.concurrent.BlockingDeque; 032import java.util.concurrent.LinkedBlockingDeque; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.ipc.HBaseRpcController; 039import org.apache.hadoop.hbase.testclassification.MediumTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.junit.After; 042import org.junit.Before; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.junit.runner.RunWith; 047import org.junit.runners.Parameterized; 048import org.mockito.Mockito; 049import org.mockito.invocation.InvocationOnMock; 050import org.mockito.stubbing.Answer; 051 052import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 053import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 054 055import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 060 061/** 062 * Tests logging of large batch commands via Multi. Tests are fast, but uses a mini-cluster (to test 063 * via "Multi" commands) so classified as MediumTests 064 */ 065@RunWith(Parameterized.class) 066@Category(MediumTests.class) 067public class TestMultiLogThreshold { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestMultiLogThreshold.class); 072 073 private static final TableName NAME = TableName.valueOf("tableName"); 074 private static final byte[] TEST_FAM = Bytes.toBytes("fam"); 075 076 private HBaseTestingUtil util; 077 private Configuration conf; 078 private int threshold; 079 private HRegionServer rs; 080 private RSRpcServices services; 081 082 private org.apache.logging.log4j.core.Appender appender; 083 084 @Parameterized.Parameter 085 public static boolean rejectLargeBatchOp; 086 087 @Parameterized.Parameters 088 public static List<Object[]> params() { 089 return Arrays.asList(new Object[] { false }, new Object[] { true }); 090 } 091 092 private final class LevelAndMessage { 093 final org.apache.logging.log4j.Level level; 094 095 final String msg; 096 097 public LevelAndMessage(org.apache.logging.log4j.Level level, String msg) { 098 this.level = level; 099 this.msg = msg; 100 } 101 102 } 103 104 // log4j2 will reuse the LogEvent so we need to copy the level and message out. 105 private BlockingDeque<LevelAndMessage> logs = new LinkedBlockingDeque<>(); 106 107 @Before 108 public void setupTest() throws Exception { 109 util = new HBaseTestingUtil(); 110 conf = util.getConfiguration(); 111 threshold = 112 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 113 conf.setBoolean("hbase.rpc.rows.size.threshold.reject", rejectLargeBatchOp); 114 util.startMiniCluster(); 115 util.createTable(NAME, TEST_FAM); 116 rs = util.getRSForFirstRegionInTable(NAME); 117 appender = mock(org.apache.logging.log4j.core.Appender.class); 118 when(appender.getName()).thenReturn("mockAppender"); 119 when(appender.isStarted()).thenReturn(true); 120 doAnswer(new Answer<Void>() { 121 122 @Override 123 public Void answer(InvocationOnMock invocation) throws Throwable { 124 org.apache.logging.log4j.core.LogEvent logEvent = 125 invocation.getArgument(0, org.apache.logging.log4j.core.LogEvent.class); 126 logs.add( 127 new LevelAndMessage(logEvent.getLevel(), logEvent.getMessage().getFormattedMessage())); 128 return null; 129 } 130 }).when(appender).append(any(org.apache.logging.log4j.core.LogEvent.class)); 131 ((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager 132 .getLogger(RSRpcServices.class)).addAppender(appender); 133 } 134 135 @After 136 public void tearDown() throws Exception { 137 ((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager 138 .getLogger(RSRpcServices.class)).removeAppender(appender); 139 util.shutdownMiniCluster(); 140 } 141 142 private enum ActionType { 143 REGION_ACTIONS, 144 ACTIONS 145 } 146 147 /** 148 * Sends a multi request with a certain amount of rows, will populate Multi command with either 149 * "rows" number of RegionActions with one Action each or one RegionAction with "rows" number of 150 * Actions 151 */ 152 private void sendMultiRequest(int rows, ActionType actionType) 153 throws ServiceException, IOException { 154 RpcController rpcc = Mockito.mock(HBaseRpcController.class); 155 MultiRequest.Builder builder = MultiRequest.newBuilder(); 156 int numRAs = 1; 157 int numAs = 1; 158 switch (actionType) { 159 case REGION_ACTIONS: 160 numRAs = rows; 161 break; 162 case ACTIONS: 163 numAs = rows; 164 break; 165 } 166 for (int i = 0; i < numRAs; i++) { 167 RegionAction.Builder rab = RegionAction.newBuilder(); 168 rab.setRegion(RequestConverter.buildRegionSpecifier( 169 HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, 170 Bytes.toBytes("someStuff" + i))); 171 for (int j = 0; j < numAs; j++) { 172 Action.Builder ab = Action.newBuilder(); 173 rab.addAction(ab.build()); 174 } 175 builder.addRegionAction(rab.build()); 176 } 177 services = new RSRpcServices(rs); 178 services.multi(rpcc, builder.build()); 179 } 180 181 private void assertLogBatchWarnings(boolean expected) { 182 boolean actual = false; 183 for (LevelAndMessage event : logs) { 184 if ( 185 event.level == org.apache.logging.log4j.Level.WARN 186 && event.msg.contains("Large batch operation detected") 187 ) { 188 actual = true; 189 break; 190 } 191 } 192 logs.clear(); 193 assertEquals(expected, actual); 194 } 195 196 @Test 197 public void testMultiLogThresholdRegionActions() throws ServiceException, IOException { 198 try { 199 sendMultiRequest(threshold + 1, ActionType.REGION_ACTIONS); 200 assertFalse(rejectLargeBatchOp); 201 } catch (ServiceException e) { 202 assertTrue(rejectLargeBatchOp); 203 } 204 assertLogBatchWarnings(true); 205 206 sendMultiRequest(threshold, ActionType.REGION_ACTIONS); 207 assertLogBatchWarnings(false); 208 209 try { 210 sendMultiRequest(threshold + 1, ActionType.ACTIONS); 211 assertFalse(rejectLargeBatchOp); 212 } catch (ServiceException e) { 213 assertTrue(rejectLargeBatchOp); 214 } 215 assertLogBatchWarnings(true); 216 217 sendMultiRequest(threshold, ActionType.ACTIONS); 218 assertLogBatchWarnings(false); 219 } 220}