001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Optional; 030import java.util.Random; 031import java.util.UUID; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Executors; 034import java.util.concurrent.atomic.AtomicBoolean; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.AuthUtil; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.CellComparator; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.coprocessor.ObserverContext; 044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 046import org.apache.hadoop.hbase.coprocessor.RegionObserver; 047import org.apache.hadoop.hbase.ipc.RpcCall; 048import org.apache.hadoop.hbase.ipc.RpcServer; 049import org.apache.hadoop.hbase.regionserver.InternalScanner; 050import org.apache.hadoop.hbase.testclassification.ClientTests; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.wal.WALEdit; 054import org.junit.AfterClass; 055import org.junit.Before; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060 061import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 062 063@Category({ ClientTests.class, MediumTests.class }) 064public class TestRequestAndConnectionAttributes { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestRequestAndConnectionAttributes.class); 069 070 private static final Map<String, byte[]> CONNECTION_ATTRIBUTES = new HashMap<>(); 071 static { 072 CONNECTION_ATTRIBUTES.put("clientId", Bytes.toBytes("foo")); 073 } 074 private static final Map<String, byte[]> REQUEST_ATTRIBUTES = new HashMap<>(); 075 private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(100); 076 private static final AtomicBoolean REQUEST_ATTRIBUTES_VALIDATED = new AtomicBoolean(false); 077 private static final byte[] REQUEST_ATTRIBUTES_TEST_TABLE_CF = Bytes.toBytes("0"); 078 private static final TableName REQUEST_ATTRIBUTES_TEST_TABLE = 079 TableName.valueOf("testRequestAttributes"); 080 081 private static HBaseTestingUtility TEST_UTIL = null; 082 083 @BeforeClass 084 public static void setUp() throws Exception { 085 TEST_UTIL = new HBaseTestingUtility(); 086 TEST_UTIL.startMiniCluster(1); 087 TEST_UTIL.createTable(REQUEST_ATTRIBUTES_TEST_TABLE, 088 new byte[][] { REQUEST_ATTRIBUTES_TEST_TABLE_CF }, 1, HConstants.DEFAULT_BLOCKSIZE, 089 AttributesCoprocessor.class.getName()); 090 } 091 092 @AfterClass 093 public static void afterClass() throws Exception { 094 TEST_UTIL.shutdownMiniCluster(); 095 } 096 097 @Before 098 public void setup() { 099 REQUEST_ATTRIBUTES_VALIDATED.getAndSet(false); 100 } 101 102 @Test 103 public void testConnectionHeaderOverwrittenAttributesRemain() throws IOException { 104 TableName tableName = TableName.valueOf("testConnectionAttributes"); 105 byte[] cf = Bytes.toBytes("0"); 106 TEST_UTIL.createTable(tableName, new byte[][] { cf }, 1, HConstants.DEFAULT_BLOCKSIZE, 107 AttributesCoprocessor.class.getName()); 108 109 Configuration conf = TEST_UTIL.getConfiguration(); 110 try (Connection conn = ConnectionFactory.createConnection(conf, null, 111 AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(tableName)) { 112 113 // submit a 300 byte rowkey here to encourage netty's allocator to overwrite the connection 114 // header 115 byte[] bytes = new byte[300]; 116 new Random().nextBytes(bytes); 117 Result result = table.get(new Get(bytes)); 118 119 assertEquals(CONNECTION_ATTRIBUTES.size(), result.size()); 120 for (Map.Entry<String, byte[]> attr : CONNECTION_ATTRIBUTES.entrySet()) { 121 byte[] val = result.getValue(Bytes.toBytes("c"), Bytes.toBytes(attr.getKey())); 122 assertEquals(Bytes.toStringBinary(attr.getValue()), Bytes.toStringBinary(val)); 123 } 124 } 125 } 126 127 @Test 128 public void testRequestAttributesGet() throws IOException { 129 addRandomRequestAttributes(); 130 131 Configuration conf = TEST_UTIL.getConfiguration(); 132 try ( 133 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 134 CONNECTION_ATTRIBUTES); 135 Table table = configureRequestAttributes( 136 conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { 137 138 table.get(new Get(Bytes.toBytes(0))); 139 } 140 141 assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); 142 } 143 144 @Test 145 public void testRequestAttributesMultiGet() throws IOException { 146 assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); 147 addRandomRequestAttributes(); 148 149 Configuration conf = TEST_UTIL.getConfiguration(); 150 try ( 151 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 152 CONNECTION_ATTRIBUTES); 153 Table table = configureRequestAttributes( 154 conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { 155 List<Get> gets = ImmutableList.of(new Get(Bytes.toBytes(0)), new Get(Bytes.toBytes(1))); 156 table.get(gets); 157 } 158 159 assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); 160 } 161 162 @Test 163 public void testRequestAttributesExists() throws IOException { 164 assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); 165 addRandomRequestAttributes(); 166 167 Configuration conf = TEST_UTIL.getConfiguration(); 168 try ( 169 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 170 CONNECTION_ATTRIBUTES); 171 Table table = configureRequestAttributes( 172 conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { 173 174 table.exists(new Get(Bytes.toBytes(0))); 175 } 176 177 assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); 178 } 179 180 @Test 181 public void testRequestAttributesScan() throws IOException { 182 assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); 183 addRandomRequestAttributes(); 184 185 Configuration conf = TEST_UTIL.getConfiguration(); 186 try ( 187 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 188 CONNECTION_ATTRIBUTES); 189 Table table = configureRequestAttributes( 190 conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { 191 ResultScanner scanner = table.getScanner(new Scan()); 192 scanner.next(); 193 } 194 assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); 195 } 196 197 @Test 198 public void testRequestAttributesPut() throws IOException { 199 assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); 200 addRandomRequestAttributes(); 201 202 Configuration conf = TEST_UTIL.getConfiguration(); 203 try ( 204 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 205 CONNECTION_ATTRIBUTES); 206 Table table = configureRequestAttributes( 207 conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { 208 Put put = new Put(Bytes.toBytes("a")); 209 put.addColumn(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), Bytes.toBytes("v")); 210 table.put(put); 211 } 212 assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); 213 } 214 215 @Test 216 public void testRequestAttributesMultiPut() throws IOException { 217 assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); 218 addRandomRequestAttributes(); 219 220 Configuration conf = TEST_UTIL.getConfiguration(); 221 try ( 222 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 223 CONNECTION_ATTRIBUTES); 224 Table table = configureRequestAttributes( 225 conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { 226 Put put = new Put(Bytes.toBytes("a")); 227 put.addColumn(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), Bytes.toBytes("v")); 228 table.put(put); 229 } 230 assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); 231 } 232 233 @Test 234 public void testRequestAttributesCheckAndMutate() throws IOException { 235 assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); 236 addRandomRequestAttributes(); 237 238 Configuration conf = TEST_UTIL.getConfiguration(); 239 try ( 240 Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), 241 CONNECTION_ATTRIBUTES); 242 Table table = configureRequestAttributes( 243 conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { 244 Put put = new Put(Bytes.toBytes("a")); 245 put.addColumn(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), Bytes.toBytes("v")); 246 CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(Bytes.toBytes("a")) 247 .ifEquals(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), Bytes.toBytes("v")) 248 .build(put); 249 table.checkAndMutate(checkAndMutate); 250 } 251 assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); 252 } 253 254 @Test 255 public void testNoRequestAttributes() throws IOException { 256 assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); 257 TableName tableName = TableName.valueOf("testNoRequestAttributesScan"); 258 TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1, 259 HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName()); 260 261 REQUEST_ATTRIBUTES.clear(); 262 Configuration conf = TEST_UTIL.getConfiguration(); 263 try (Connection conn = ConnectionFactory.createConnection(conf, null, 264 AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES)) { 265 TableBuilder tableBuilder = conn.getTableBuilder(tableName, null); 266 try (Table table = tableBuilder.build()) { 267 table.get(new Get(Bytes.toBytes(0))); 268 assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); 269 } 270 } 271 } 272 273 private void addRandomRequestAttributes() { 274 REQUEST_ATTRIBUTES.clear(); 275 int j = Math.max(2, (int) (10 * Math.random())); 276 for (int i = 0; i < j; i++) { 277 REQUEST_ATTRIBUTES.put(String.valueOf(i), Bytes.toBytes(UUID.randomUUID().toString())); 278 } 279 } 280 281 private static TableBuilder configureRequestAttributes(TableBuilder tableBuilder) { 282 REQUEST_ATTRIBUTES.forEach(tableBuilder::setRequestAttribute); 283 return tableBuilder; 284 } 285 286 public static class AttributesCoprocessor implements RegionObserver, RegionCoprocessor { 287 288 @Override 289 public Optional<RegionObserver> getRegionObserver() { 290 return Optional.of(this); 291 } 292 293 @Override 294 public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, 295 List<Cell> result) throws IOException { 296 validateRequestAttributes(); 297 298 // for connection attrs test 299 RpcCall rpcCall = RpcServer.getCurrentCall().get(); 300 for (Map.Entry<String, byte[]> attr : rpcCall.getRequestAttributes().entrySet()) { 301 result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow()) 302 .setFamily(Bytes.toBytes("r")).setQualifier(Bytes.toBytes(attr.getKey())) 303 .setValue(attr.getValue()).setType(Cell.Type.Put).setTimestamp(1).build()); 304 } 305 for (Map.Entry<String, byte[]> attr : rpcCall.getConnectionAttributes().entrySet()) { 306 result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow()) 307 .setFamily(Bytes.toBytes("c")).setQualifier(Bytes.toBytes(attr.getKey())) 308 .setValue(attr.getValue()).setType(Cell.Type.Put).setTimestamp(1).build()); 309 } 310 result.sort(CellComparator.getInstance()); 311 c.bypass(); 312 } 313 314 @Override 315 public boolean preScannerNext(ObserverContext<RegionCoprocessorEnvironment> c, 316 InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException { 317 validateRequestAttributes(); 318 return hasNext; 319 } 320 321 @Override 322 public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit) 323 throws IOException { 324 validateRequestAttributes(); 325 } 326 327 private void validateRequestAttributes() { 328 RpcCall rpcCall = RpcServer.getCurrentCall().get(); 329 Map<String, byte[]> attrs = rpcCall.getRequestAttributes(); 330 if (attrs.size() != REQUEST_ATTRIBUTES.size()) { 331 return; 332 } 333 for (Map.Entry<String, byte[]> attr : attrs.entrySet()) { 334 if (!REQUEST_ATTRIBUTES.containsKey(attr.getKey())) { 335 return; 336 } 337 if (!Arrays.equals(REQUEST_ATTRIBUTES.get(attr.getKey()), attr.getValue())) { 338 return; 339 } 340 } 341 REQUEST_ATTRIBUTES_VALIDATED.getAndSet(true); 342 } 343 } 344}