001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.util.ArrayList; 021import java.util.Arrays; 022import java.util.List; 023import java.util.concurrent.CompletableFuture; 024import org.apache.hadoop.hbase.HBaseClassTestRule; 025import org.apache.hadoop.hbase.HBaseTestingUtil; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.regionserver.MetricsRegionServer; 028import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource; 029import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSourceImpl; 030import org.apache.hadoop.hbase.testclassification.ClientTests; 031import org.apache.hadoop.hbase.testclassification.MediumTests; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.hbase.util.JVMClusterUtil; 034import org.junit.AfterClass; 035import org.junit.Assert; 036import org.junit.BeforeClass; 037import org.junit.ClassRule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040 041import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 042 043@Category({ MediumTests.class, ClientTests.class }) 044public class TestAsyncTableQueryMetrics { 045 046 @ClassRule 047 public static final HBaseClassTestRule CLASS_RULE = 048 HBaseClassTestRule.forClass(TestAsyncTableQueryMetrics.class); 049 050 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 051 052 private static final TableName TABLE_NAME = TableName.valueOf("ResultMetrics"); 053 054 private static final byte[] CF = Bytes.toBytes("cf"); 055 056 private static final byte[] CQ = Bytes.toBytes("cq"); 057 058 private static final byte[] VALUE = Bytes.toBytes("value"); 059 060 private static final byte[] ROW_1 = Bytes.toBytes("zzz1"); 061 private static final byte[] ROW_2 = Bytes.toBytes("zzz2"); 062 private static final byte[] ROW_3 = Bytes.toBytes("zzz3"); 063 064 private static AsyncConnection CONN; 065 066 @BeforeClass 067 public static void setUp() throws Exception { 068 UTIL.startMiniCluster(3); 069 // Create 3 rows in the table, with rowkeys starting with "zzz*" so that 070 // scan are forced to hit all the regions. 071 try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) { 072 table.put(Arrays.asList(new Put(ROW_1).addColumn(CF, CQ, VALUE), 073 new Put(ROW_2).addColumn(CF, CQ, VALUE), new Put(ROW_3).addColumn(CF, CQ, VALUE))); 074 } 075 CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); 076 CONN.getAdmin().flush(TABLE_NAME).join(); 077 } 078 079 @AfterClass 080 public static void tearDown() throws Exception { 081 Closeables.close(CONN, true); 082 UTIL.shutdownMiniCluster(); 083 } 084 085 @Test 086 public void itTestsGets() throws Exception { 087 // Test a single Get 088 Get g1 = new Get(ROW_1); 089 g1.setQueryMetricsEnabled(true); 090 091 long bbs = getClusterBlockBytesScanned(); 092 Result result = CONN.getTable(TABLE_NAME).get(g1).get(); 093 bbs += result.getMetrics().getBlockBytesScanned(); 094 Assert.assertNotNull(result.getMetrics()); 095 Assert.assertEquals(getClusterBlockBytesScanned(), bbs); 096 097 // Test multigets 098 Get g2 = new Get(ROW_2); 099 g2.setQueryMetricsEnabled(true); 100 101 Get g3 = new Get(ROW_3); 102 g3.setQueryMetricsEnabled(true); 103 104 List<CompletableFuture<Result>> futures = CONN.getTable(TABLE_NAME).get(List.of(g1, g2, g3)); 105 106 for (CompletableFuture<Result> future : futures) { 107 result = future.join(); 108 Assert.assertNotNull(result.getMetrics()); 109 bbs += result.getMetrics().getBlockBytesScanned(); 110 } 111 112 Assert.assertEquals(getClusterBlockBytesScanned(), bbs); 113 } 114 115 @Test 116 public void itTestsDefaultGetNoMetrics() throws Exception { 117 // Test a single Get 118 Get g1 = new Get(ROW_1); 119 120 Result result = CONN.getTable(TABLE_NAME).get(g1).get(); 121 Assert.assertNull(result.getMetrics()); 122 123 // Test multigets 124 Get g2 = new Get(ROW_2); 125 Get g3 = new Get(ROW_3); 126 List<CompletableFuture<Result>> futures = CONN.getTable(TABLE_NAME).get(List.of(g1, g2, g3)); 127 futures.forEach(f -> Assert.assertNull(f.join().getMetrics())); 128 129 } 130 131 @Test 132 public void itTestsScans() { 133 Scan scan = new Scan(); 134 scan.setQueryMetricsEnabled(true); 135 136 long bbs = getClusterBlockBytesScanned(); 137 try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(scan)) { 138 for (Result result : scanner) { 139 Assert.assertNotNull(result.getMetrics()); 140 bbs += result.getMetrics().getBlockBytesScanned(); 141 Assert.assertEquals(getClusterBlockBytesScanned(), bbs); 142 } 143 } 144 } 145 146 @Test 147 public void itTestsDefaultScanNoMetrics() { 148 Scan scan = new Scan(); 149 150 try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(scan)) { 151 for (Result result : scanner) { 152 Assert.assertNull(result.getMetrics()); 153 } 154 } 155 } 156 157 @Test 158 public void itTestsAtomicOperations() { 159 CheckAndMutate cam = CheckAndMutate.newBuilder(ROW_1).ifEquals(CF, CQ, VALUE) 160 .queryMetricsEnabled(true).build(new Put(ROW_1).addColumn(CF, CQ, VALUE)); 161 162 long bbs = getClusterBlockBytesScanned(); 163 CheckAndMutateResult result = CONN.getTable(TABLE_NAME).checkAndMutate(cam).join(); 164 QueryMetrics metrics = result.getMetrics(); 165 166 Assert.assertNotNull(metrics); 167 Assert.assertEquals(getClusterBlockBytesScanned(), bbs + metrics.getBlockBytesScanned()); 168 169 bbs = getClusterBlockBytesScanned(); 170 List<CheckAndMutate> batch = new ArrayList<>(); 171 batch.add(cam); 172 batch.add(CheckAndMutate.newBuilder(ROW_2).queryMetricsEnabled(true).ifEquals(CF, CQ, VALUE) 173 .build(new Put(ROW_2).addColumn(CF, CQ, VALUE))); 174 batch.add(CheckAndMutate.newBuilder(ROW_3).queryMetricsEnabled(true).ifEquals(CF, CQ, VALUE) 175 .build(new Put(ROW_3).addColumn(CF, CQ, VALUE))); 176 177 List<Object> res = CONN.getTable(TABLE_NAME).batchAll(batch).join(); 178 long totalBbs = res.stream() 179 .mapToLong(r -> ((CheckAndMutateResult) r).getMetrics().getBlockBytesScanned()).sum(); 180 Assert.assertEquals(getClusterBlockBytesScanned(), bbs + totalBbs); 181 182 bbs = getClusterBlockBytesScanned(); 183 184 // flush to force fetch from disk 185 CONN.getAdmin().flush(TABLE_NAME).join(); 186 List<CompletableFuture<Object>> futures = CONN.getTable(TABLE_NAME).batch(batch); 187 188 totalBbs = futures.stream().map(CompletableFuture::join) 189 .mapToLong(r -> ((CheckAndMutateResult) r).getMetrics().getBlockBytesScanned()).sum(); 190 Assert.assertEquals(getClusterBlockBytesScanned(), bbs + totalBbs); 191 } 192 193 @Test 194 public void itTestsDefaultAtomicOperations() { 195 CheckAndMutate cam = CheckAndMutate.newBuilder(ROW_1).ifEquals(CF, CQ, VALUE) 196 .build(new Put(ROW_1).addColumn(CF, CQ, VALUE)); 197 198 CheckAndMutateResult result = CONN.getTable(TABLE_NAME).checkAndMutate(cam).join(); 199 QueryMetrics metrics = result.getMetrics(); 200 201 Assert.assertNull(metrics); 202 203 List<CheckAndMutate> batch = new ArrayList<>(); 204 batch.add(cam); 205 batch.add(CheckAndMutate.newBuilder(ROW_2).ifEquals(CF, CQ, VALUE) 206 .build(new Put(ROW_2).addColumn(CF, CQ, VALUE))); 207 batch.add(CheckAndMutate.newBuilder(ROW_3).ifEquals(CF, CQ, VALUE) 208 .build(new Put(ROW_3).addColumn(CF, CQ, VALUE))); 209 210 List<Object> res = CONN.getTable(TABLE_NAME).batchAll(batch).join(); 211 for (Object r : res) { 212 Assert.assertNull(((CheckAndMutateResult) r).getMetrics()); 213 } 214 215 // flush to force fetch from disk 216 CONN.getAdmin().flush(TABLE_NAME).join(); 217 List<CompletableFuture<Object>> futures = CONN.getTable(TABLE_NAME).batch(batch); 218 219 for (CompletableFuture<Object> future : futures) { 220 Object r = future.join(); 221 Assert.assertNull(((CheckAndMutateResult) r).getMetrics()); 222 } 223 } 224 225 private static long getClusterBlockBytesScanned() { 226 long bbs = 0L; 227 228 for (JVMClusterUtil.RegionServerThread rs : UTIL.getHBaseCluster().getRegionServerThreads()) { 229 MetricsRegionServer metrics = rs.getRegionServer().getMetrics(); 230 MetricsRegionServerSourceImpl source = 231 (MetricsRegionServerSourceImpl) metrics.getMetricsSource(); 232 233 bbs += source.getMetricsRegistry() 234 .getCounter(MetricsRegionServerSource.BLOCK_BYTES_SCANNED_KEY, 0L).value(); 235 } 236 237 return bbs; 238 } 239}