001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.Random;
031import java.util.UUID;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Executors;
034import java.util.concurrent.atomic.AtomicBoolean;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.AuthUtil;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellComparator;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtility;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.coprocessor.ObserverContext;
044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
046import org.apache.hadoop.hbase.coprocessor.RegionObserver;
047import org.apache.hadoop.hbase.ipc.RpcCall;
048import org.apache.hadoop.hbase.ipc.RpcServer;
049import org.apache.hadoop.hbase.regionserver.InternalScanner;
050import org.apache.hadoop.hbase.testclassification.ClientTests;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.wal.WALEdit;
054import org.junit.AfterClass;
055import org.junit.Before;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060
061import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
062
063@Category({ ClientTests.class, MediumTests.class })
064public class TestRequestAndConnectionAttributes {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestRequestAndConnectionAttributes.class);
069
070  private static final Map<String, byte[]> CONNECTION_ATTRIBUTES = new HashMap<>();
071  static {
072    CONNECTION_ATTRIBUTES.put("clientId", Bytes.toBytes("foo"));
073  }
074  private static final Map<String, byte[]> REQUEST_ATTRIBUTES = new HashMap<>();
075  private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(100);
076  private static final AtomicBoolean REQUEST_ATTRIBUTES_VALIDATED = new AtomicBoolean(false);
077  private static final byte[] REQUEST_ATTRIBUTES_TEST_TABLE_CF = Bytes.toBytes("0");
078  private static final TableName REQUEST_ATTRIBUTES_TEST_TABLE =
079    TableName.valueOf("testRequestAttributes");
080
081  private static HBaseTestingUtility TEST_UTIL = null;
082
083  @BeforeClass
084  public static void setUp() throws Exception {
085    TEST_UTIL = new HBaseTestingUtility();
086    TEST_UTIL.startMiniCluster(1);
087    TEST_UTIL.createTable(REQUEST_ATTRIBUTES_TEST_TABLE,
088      new byte[][] { REQUEST_ATTRIBUTES_TEST_TABLE_CF }, 1, HConstants.DEFAULT_BLOCKSIZE,
089      AttributesCoprocessor.class.getName());
090  }
091
092  @AfterClass
093  public static void afterClass() throws Exception {
094    TEST_UTIL.shutdownMiniCluster();
095  }
096
097  @Before
098  public void setup() {
099    REQUEST_ATTRIBUTES_VALIDATED.getAndSet(false);
100  }
101
102  @Test
103  public void testConnectionHeaderOverwrittenAttributesRemain() throws IOException {
104    TableName tableName = TableName.valueOf("testConnectionAttributes");
105    byte[] cf = Bytes.toBytes("0");
106    TEST_UTIL.createTable(tableName, new byte[][] { cf }, 1, HConstants.DEFAULT_BLOCKSIZE,
107      AttributesCoprocessor.class.getName());
108
109    Configuration conf = TEST_UTIL.getConfiguration();
110    try (Connection conn = ConnectionFactory.createConnection(conf, null,
111      AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(tableName)) {
112
113      // submit a 300 byte rowkey here to encourage netty's allocator to overwrite the connection
114      // header
115      byte[] bytes = new byte[300];
116      new Random().nextBytes(bytes);
117      Result result = table.get(new Get(bytes));
118
119      assertEquals(CONNECTION_ATTRIBUTES.size(), result.size());
120      for (Map.Entry<String, byte[]> attr : CONNECTION_ATTRIBUTES.entrySet()) {
121        byte[] val = result.getValue(Bytes.toBytes("c"), Bytes.toBytes(attr.getKey()));
122        assertEquals(Bytes.toStringBinary(attr.getValue()), Bytes.toStringBinary(val));
123      }
124    }
125  }
126
127  @Test
128  public void testRequestAttributesGet() throws IOException {
129    addRandomRequestAttributes();
130
131    Configuration conf = TEST_UTIL.getConfiguration();
132    try (
133      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
134        CONNECTION_ATTRIBUTES);
135      Table table = configureRequestAttributes(
136        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) {
137
138      table.get(new Get(Bytes.toBytes(0)));
139    }
140
141    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
142  }
143
144  @Test
145  public void testRequestAttributesMultiGet() throws IOException {
146    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
147    addRandomRequestAttributes();
148
149    Configuration conf = TEST_UTIL.getConfiguration();
150    try (
151      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
152        CONNECTION_ATTRIBUTES);
153      Table table = configureRequestAttributes(
154        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) {
155      List<Get> gets = ImmutableList.of(new Get(Bytes.toBytes(0)), new Get(Bytes.toBytes(1)));
156      table.get(gets);
157    }
158
159    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
160  }
161
162  @Test
163  public void testRequestAttributesExists() throws IOException {
164    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
165    addRandomRequestAttributes();
166
167    Configuration conf = TEST_UTIL.getConfiguration();
168    try (
169      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
170        CONNECTION_ATTRIBUTES);
171      Table table = configureRequestAttributes(
172        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) {
173
174      table.exists(new Get(Bytes.toBytes(0)));
175    }
176
177    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
178  }
179
180  @Test
181  public void testRequestAttributesScan() throws IOException {
182    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
183    addRandomRequestAttributes();
184
185    Configuration conf = TEST_UTIL.getConfiguration();
186    try (
187      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
188        CONNECTION_ATTRIBUTES);
189      Table table = configureRequestAttributes(
190        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) {
191      ResultScanner scanner = table.getScanner(new Scan());
192      scanner.next();
193    }
194    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
195  }
196
197  @Test
198  public void testRequestAttributesPut() throws IOException {
199    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
200    addRandomRequestAttributes();
201
202    Configuration conf = TEST_UTIL.getConfiguration();
203    try (
204      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
205        CONNECTION_ATTRIBUTES);
206      Table table = configureRequestAttributes(
207        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) {
208      Put put = new Put(Bytes.toBytes("a"));
209      put.addColumn(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), Bytes.toBytes("v"));
210      table.put(put);
211    }
212    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
213  }
214
215  @Test
216  public void testRequestAttributesMultiPut() throws IOException {
217    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
218    addRandomRequestAttributes();
219
220    Configuration conf = TEST_UTIL.getConfiguration();
221    try (
222      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
223        CONNECTION_ATTRIBUTES);
224      Table table = configureRequestAttributes(
225        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) {
226      Put put = new Put(Bytes.toBytes("a"));
227      put.addColumn(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), Bytes.toBytes("v"));
228      table.put(put);
229    }
230    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
231  }
232
233  @Test
234  public void testRequestAttributesCheckAndMutate() throws IOException {
235    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
236    addRandomRequestAttributes();
237
238    Configuration conf = TEST_UTIL.getConfiguration();
239    try (
240      Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
241        CONNECTION_ATTRIBUTES);
242      Table table = configureRequestAttributes(
243        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) {
244      Put put = new Put(Bytes.toBytes("a"));
245      put.addColumn(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), Bytes.toBytes("v"));
246      CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(Bytes.toBytes("a"))
247        .ifEquals(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), Bytes.toBytes("v"))
248        .build(put);
249      table.checkAndMutate(checkAndMutate);
250    }
251    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
252  }
253
254  @Test
255  public void testNoRequestAttributes() throws IOException {
256    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
257    TableName tableName = TableName.valueOf("testNoRequestAttributesScan");
258    TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1,
259      HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName());
260
261    REQUEST_ATTRIBUTES.clear();
262    Configuration conf = TEST_UTIL.getConfiguration();
263    try (Connection conn = ConnectionFactory.createConnection(conf, null,
264      AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES)) {
265      TableBuilder tableBuilder = conn.getTableBuilder(tableName, null);
266      try (Table table = tableBuilder.build()) {
267        table.get(new Get(Bytes.toBytes(0)));
268        assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
269      }
270    }
271  }
272
273  private void addRandomRequestAttributes() {
274    REQUEST_ATTRIBUTES.clear();
275    int j = Math.max(2, (int) (10 * Math.random()));
276    for (int i = 0; i < j; i++) {
277      REQUEST_ATTRIBUTES.put(String.valueOf(i), Bytes.toBytes(UUID.randomUUID().toString()));
278    }
279  }
280
281  private static TableBuilder configureRequestAttributes(TableBuilder tableBuilder) {
282    REQUEST_ATTRIBUTES.forEach(tableBuilder::setRequestAttribute);
283    return tableBuilder;
284  }
285
286  public static class AttributesCoprocessor implements RegionObserver, RegionCoprocessor {
287
288    @Override
289    public Optional<RegionObserver> getRegionObserver() {
290      return Optional.of(this);
291    }
292
293    @Override
294    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
295      List<Cell> result) throws IOException {
296      validateRequestAttributes();
297
298      // for connection attrs test
299      RpcCall rpcCall = RpcServer.getCurrentCall().get();
300      for (Map.Entry<String, byte[]> attr : rpcCall.getRequestAttributes().entrySet()) {
301        result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow())
302          .setFamily(Bytes.toBytes("r")).setQualifier(Bytes.toBytes(attr.getKey()))
303          .setValue(attr.getValue()).setType(Cell.Type.Put).setTimestamp(1).build());
304      }
305      for (Map.Entry<String, byte[]> attr : rpcCall.getConnectionAttributes().entrySet()) {
306        result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow())
307          .setFamily(Bytes.toBytes("c")).setQualifier(Bytes.toBytes(attr.getKey()))
308          .setValue(attr.getValue()).setType(Cell.Type.Put).setTimestamp(1).build());
309      }
310      result.sort(CellComparator.getInstance());
311      c.bypass();
312    }
313
314    @Override
315    public boolean preScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
316      InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
317      validateRequestAttributes();
318      return hasNext;
319    }
320
321    @Override
322    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit)
323      throws IOException {
324      validateRequestAttributes();
325    }
326
327    private void validateRequestAttributes() {
328      RpcCall rpcCall = RpcServer.getCurrentCall().get();
329      Map<String, byte[]> attrs = rpcCall.getRequestAttributes();
330      if (attrs.size() != REQUEST_ATTRIBUTES.size()) {
331        return;
332      }
333      for (Map.Entry<String, byte[]> attr : attrs.entrySet()) {
334        if (!REQUEST_ATTRIBUTES.containsKey(attr.getKey())) {
335          return;
336        }
337        if (!Arrays.equals(REQUEST_ATTRIBUTES.get(attr.getKey()), attr.getValue())) {
338          return;
339        }
340      }
341      REQUEST_ATTRIBUTES_VALIDATED.getAndSet(true);
342    }
343  }
344}