001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.List;
023import java.util.concurrent.CompletableFuture;
024import org.apache.hadoop.hbase.HBaseClassTestRule;
025import org.apache.hadoop.hbase.HBaseTestingUtil;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
028import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource;
029import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSourceImpl;
030import org.apache.hadoop.hbase.testclassification.ClientTests;
031import org.apache.hadoop.hbase.testclassification.MediumTests;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.hbase.util.JVMClusterUtil;
034import org.junit.AfterClass;
035import org.junit.Assert;
036import org.junit.BeforeClass;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040
041import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
042
043@Category({ MediumTests.class, ClientTests.class })
044public class TestAsyncTableQueryMetrics {
045
046  @ClassRule
047  public static final HBaseClassTestRule CLASS_RULE =
048    HBaseClassTestRule.forClass(TestAsyncTableQueryMetrics.class);
049
050  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
051
052  private static final TableName TABLE_NAME = TableName.valueOf("ResultMetrics");
053
054  private static final byte[] CF = Bytes.toBytes("cf");
055
056  private static final byte[] CQ = Bytes.toBytes("cq");
057
058  private static final byte[] VALUE = Bytes.toBytes("value");
059
060  private static final byte[] ROW_1 = Bytes.toBytes("zzz1");
061  private static final byte[] ROW_2 = Bytes.toBytes("zzz2");
062  private static final byte[] ROW_3 = Bytes.toBytes("zzz3");
063
064  private static AsyncConnection CONN;
065
066  @BeforeClass
067  public static void setUp() throws Exception {
068    UTIL.startMiniCluster(3);
069    // Create 3 rows in the table, with rowkeys starting with "zzz*" so that
070    // scan are forced to hit all the regions.
071    try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
072      table.put(Arrays.asList(new Put(ROW_1).addColumn(CF, CQ, VALUE),
073        new Put(ROW_2).addColumn(CF, CQ, VALUE), new Put(ROW_3).addColumn(CF, CQ, VALUE)));
074    }
075    CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
076    CONN.getAdmin().flush(TABLE_NAME).join();
077  }
078
079  @AfterClass
080  public static void tearDown() throws Exception {
081    Closeables.close(CONN, true);
082    UTIL.shutdownMiniCluster();
083  }
084
085  @Test
086  public void itTestsGets() throws Exception {
087    // Test a single Get
088    Get g1 = new Get(ROW_1);
089    g1.setQueryMetricsEnabled(true);
090
091    long bbs = getClusterBlockBytesScanned();
092    Result result = CONN.getTable(TABLE_NAME).get(g1).get();
093    bbs += result.getMetrics().getBlockBytesScanned();
094    Assert.assertNotNull(result.getMetrics());
095    Assert.assertEquals(getClusterBlockBytesScanned(), bbs);
096
097    // Test multigets
098    Get g2 = new Get(ROW_2);
099    g2.setQueryMetricsEnabled(true);
100
101    Get g3 = new Get(ROW_3);
102    g3.setQueryMetricsEnabled(true);
103
104    List<CompletableFuture<Result>> futures = CONN.getTable(TABLE_NAME).get(List.of(g1, g2, g3));
105
106    for (CompletableFuture<Result> future : futures) {
107      result = future.join();
108      Assert.assertNotNull(result.getMetrics());
109      bbs += result.getMetrics().getBlockBytesScanned();
110    }
111
112    Assert.assertEquals(getClusterBlockBytesScanned(), bbs);
113  }
114
115  @Test
116  public void itTestsDefaultGetNoMetrics() throws Exception {
117    // Test a single Get
118    Get g1 = new Get(ROW_1);
119
120    Result result = CONN.getTable(TABLE_NAME).get(g1).get();
121    Assert.assertNull(result.getMetrics());
122
123    // Test multigets
124    Get g2 = new Get(ROW_2);
125    Get g3 = new Get(ROW_3);
126    List<CompletableFuture<Result>> futures = CONN.getTable(TABLE_NAME).get(List.of(g1, g2, g3));
127    futures.forEach(f -> Assert.assertNull(f.join().getMetrics()));
128
129  }
130
131  @Test
132  public void itTestsScans() {
133    Scan scan = new Scan();
134    scan.setQueryMetricsEnabled(true);
135
136    long bbs = getClusterBlockBytesScanned();
137    try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(scan)) {
138      for (Result result : scanner) {
139        Assert.assertNotNull(result.getMetrics());
140        bbs += result.getMetrics().getBlockBytesScanned();
141        Assert.assertEquals(getClusterBlockBytesScanned(), bbs);
142      }
143    }
144  }
145
146  @Test
147  public void itTestsDefaultScanNoMetrics() {
148    Scan scan = new Scan();
149
150    try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(scan)) {
151      for (Result result : scanner) {
152        Assert.assertNull(result.getMetrics());
153      }
154    }
155  }
156
157  @Test
158  public void itTestsAtomicOperations() {
159    CheckAndMutate cam = CheckAndMutate.newBuilder(ROW_1).ifEquals(CF, CQ, VALUE)
160      .queryMetricsEnabled(true).build(new Put(ROW_1).addColumn(CF, CQ, VALUE));
161
162    long bbs = getClusterBlockBytesScanned();
163    CheckAndMutateResult result = CONN.getTable(TABLE_NAME).checkAndMutate(cam).join();
164    QueryMetrics metrics = result.getMetrics();
165
166    Assert.assertNotNull(metrics);
167    Assert.assertEquals(getClusterBlockBytesScanned(), bbs + metrics.getBlockBytesScanned());
168
169    bbs = getClusterBlockBytesScanned();
170    List<CheckAndMutate> batch = new ArrayList<>();
171    batch.add(cam);
172    batch.add(CheckAndMutate.newBuilder(ROW_2).queryMetricsEnabled(true).ifEquals(CF, CQ, VALUE)
173      .build(new Put(ROW_2).addColumn(CF, CQ, VALUE)));
174    batch.add(CheckAndMutate.newBuilder(ROW_3).queryMetricsEnabled(true).ifEquals(CF, CQ, VALUE)
175      .build(new Put(ROW_3).addColumn(CF, CQ, VALUE)));
176
177    List<Object> res = CONN.getTable(TABLE_NAME).batchAll(batch).join();
178    long totalBbs = res.stream()
179      .mapToLong(r -> ((CheckAndMutateResult) r).getMetrics().getBlockBytesScanned()).sum();
180    Assert.assertEquals(getClusterBlockBytesScanned(), bbs + totalBbs);
181
182    bbs = getClusterBlockBytesScanned();
183
184    // flush to force fetch from disk
185    CONN.getAdmin().flush(TABLE_NAME).join();
186    List<CompletableFuture<Object>> futures = CONN.getTable(TABLE_NAME).batch(batch);
187
188    totalBbs = futures.stream().map(CompletableFuture::join)
189      .mapToLong(r -> ((CheckAndMutateResult) r).getMetrics().getBlockBytesScanned()).sum();
190    Assert.assertEquals(getClusterBlockBytesScanned(), bbs + totalBbs);
191  }
192
193  @Test
194  public void itTestsDefaultAtomicOperations() {
195    CheckAndMutate cam = CheckAndMutate.newBuilder(ROW_1).ifEquals(CF, CQ, VALUE)
196      .build(new Put(ROW_1).addColumn(CF, CQ, VALUE));
197
198    CheckAndMutateResult result = CONN.getTable(TABLE_NAME).checkAndMutate(cam).join();
199    QueryMetrics metrics = result.getMetrics();
200
201    Assert.assertNull(metrics);
202
203    List<CheckAndMutate> batch = new ArrayList<>();
204    batch.add(cam);
205    batch.add(CheckAndMutate.newBuilder(ROW_2).ifEquals(CF, CQ, VALUE)
206      .build(new Put(ROW_2).addColumn(CF, CQ, VALUE)));
207    batch.add(CheckAndMutate.newBuilder(ROW_3).ifEquals(CF, CQ, VALUE)
208      .build(new Put(ROW_3).addColumn(CF, CQ, VALUE)));
209
210    List<Object> res = CONN.getTable(TABLE_NAME).batchAll(batch).join();
211    for (Object r : res) {
212      Assert.assertNull(((CheckAndMutateResult) r).getMetrics());
213    }
214
215    // flush to force fetch from disk
216    CONN.getAdmin().flush(TABLE_NAME).join();
217    List<CompletableFuture<Object>> futures = CONN.getTable(TABLE_NAME).batch(batch);
218
219    for (CompletableFuture<Object> future : futures) {
220      Object r = future.join();
221      Assert.assertNull(((CheckAndMutateResult) r).getMetrics());
222    }
223  }
224
225  private static long getClusterBlockBytesScanned() {
226    long bbs = 0L;
227
228    for (JVMClusterUtil.RegionServerThread rs : UTIL.getHBaseCluster().getRegionServerThreads()) {
229      MetricsRegionServer metrics = rs.getRegionServer().getMetrics();
230      MetricsRegionServerSourceImpl source =
231        (MetricsRegionServerSourceImpl) metrics.getMetricsSource();
232
233      bbs += source.getMetricsRegistry()
234        .getCounter(MetricsRegionServerSource.BLOCK_BYTES_SCANNED_KEY, 0L).value();
235    }
236
237    return bbs;
238  }
239}