001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.trace; 019 020import io.opentelemetry.api.common.AttributeKey; 021import io.opentelemetry.api.common.Attributes; 022import io.opentelemetry.api.trace.Span; 023import io.opentelemetry.context.Scope; 024import java.io.IOException; 025import java.util.Random; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.LinkedBlockingQueue; 029import java.util.concurrent.ThreadLocalRandom; 030import java.util.concurrent.TimeUnit; 031import org.apache.commons.io.IOUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.IntegrationTestingUtility; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.BufferedMutator; 038import org.apache.hadoop.hbase.client.Get; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.ResultScanner; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.testclassification.IntegrationTests; 045import org.apache.hadoop.hbase.util.AbstractHBaseTool; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.util.ToolRunner; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050 051import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 052 053@Category(IntegrationTests.class) 054public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { 055 056 public static final String TABLE_ARG = "t"; 057 public static final String CF_ARG = "f"; 058 059 public static final String TABLE_NAME_DEFAULT = "SendTracesTable"; 060 public static final String COLUMN_FAMILY_DEFAULT = "D"; 061 private TableName tableName = TableName.valueOf(TABLE_NAME_DEFAULT); 062 private byte[] familyName = Bytes.toBytes(COLUMN_FAMILY_DEFAULT); 063 private IntegrationTestingUtility util; 064 private Admin admin; 065 066 public static void main(String[] args) throws Exception { 067 Configuration configuration = HBaseConfiguration.create(); 068 IntegrationTestingUtility.setUseDistributedCluster(configuration); 069 IntegrationTestSendTraceRequests tool = new IntegrationTestSendTraceRequests(); 070 ToolRunner.run(configuration, tool, args); 071 } 072 073 @Override 074 protected void addOptions() { 075 addOptWithArg(TABLE_ARG, "The table name to target. Will be created if not there already."); 076 addOptWithArg(CF_ARG, "The family to target"); 077 } 078 079 @Override 080 public void processOptions(CommandLine cmd) { 081 String tableNameString = cmd.getOptionValue(TABLE_ARG, TABLE_NAME_DEFAULT); 082 String familyString = cmd.getOptionValue(CF_ARG, COLUMN_FAMILY_DEFAULT); 083 084 this.tableName = TableName.valueOf(tableNameString); 085 this.familyName = Bytes.toBytes(familyString); 086 } 087 088 @Override 089 public int doWork() throws Exception { 090 internalDoWork(); 091 return 0; 092 } 093 094 @Test 095 public void internalDoWork() throws Exception { 096 util = createUtil(); 097 admin = util.getAdmin(); 098 099 deleteTable(); 100 createTable(); 101 LinkedBlockingQueue<Long> rks = insertData(); 102 103 ExecutorService service = Executors.newFixedThreadPool(20); 104 doScans(service, rks); 105 doGets(service, rks); 106 107 service.shutdown(); 108 service.awaitTermination(100, TimeUnit.SECONDS); 109 Thread.sleep(90000); 110 util.restoreCluster(); 111 util = null; 112 } 113 114 private void doScans(ExecutorService service, final LinkedBlockingQueue<Long> rks) { 115 for (int i = 0; i < 100; i++) { 116 Runnable runnable = new Runnable() { 117 private final LinkedBlockingQueue<Long> rowKeyQueue = rks; 118 119 @Override 120 public void run() { 121 ResultScanner rs = null; 122 Span span = TraceUtil.getGlobalTracer().spanBuilder("Scan").startSpan(); 123 try (Scope scope = span.makeCurrent()) { 124 Table ht = util.getConnection().getTable(tableName); 125 Scan s = new Scan(); 126 s.withStartRow(Bytes.toBytes(rowKeyQueue.take())); 127 s.setBatch(7); 128 rs = ht.getScanner(s); 129 // Something to keep the jvm from removing the loop. 130 long accum = 0; 131 132 for (int x = 0; x < 1000; x++) { 133 Result r = rs.next(); 134 accum |= Bytes.toLong(r.getRow()); 135 } 136 137 span.addEvent("Accum result = " + accum); 138 139 ht.close(); 140 ht = null; 141 } catch (Exception e) { 142 span.addEvent("exception", 143 Attributes.of(AttributeKey.stringKey("exception"), e.getClass().getSimpleName())); 144 } finally { 145 span.end(); 146 if (rs != null) { 147 rs.close(); 148 } 149 } 150 } 151 }; 152 service.execute(runnable); 153 } 154 } 155 156 private void doGets(ExecutorService service, final LinkedBlockingQueue<Long> rowKeys) 157 throws IOException { 158 for (int i = 0; i < 100; i++) { 159 Runnable runnable = new Runnable() { 160 private final LinkedBlockingQueue<Long> rowKeyQueue = rowKeys; 161 162 @Override 163 public void run() { 164 Table ht = null; 165 try { 166 ht = util.getConnection().getTable(tableName); 167 long accum = 0; 168 for (int x = 0; x < 5; x++) { 169 Span span = TraceUtil.getGlobalTracer().spanBuilder("gets").startSpan(); 170 try (Scope scope = span.makeCurrent()) { 171 long rk = rowKeyQueue.take(); 172 Result r1 = ht.get(new Get(Bytes.toBytes(rk))); 173 if (r1 != null) { 174 accum |= Bytes.toLong(r1.getRow()); 175 } 176 Result r2 = ht.get(new Get(Bytes.toBytes(rk))); 177 if (r2 != null) { 178 accum |= Bytes.toLong(r2.getRow()); 179 } 180 span.addEvent("Accum = " + accum); 181 } catch (IOException | InterruptedException ie) { 182 // IGNORED 183 } finally { 184 span.end(); 185 } 186 } 187 } catch (IOException e) { 188 // IGNORED 189 } finally { 190 if (ht != null) { 191 IOUtils.closeQuietly(ht); 192 } 193 } 194 } 195 }; 196 service.execute(runnable); 197 } 198 } 199 200 private void createTable() throws IOException { 201 Span span = TraceUtil.getGlobalTracer().spanBuilder("createTable").startSpan(); 202 try (Scope scope = span.makeCurrent()) { 203 util.createTable(tableName, familyName); 204 } finally { 205 span.end(); 206 } 207 } 208 209 private void deleteTable() throws IOException { 210 Span span = TraceUtil.getGlobalTracer().spanBuilder("deleteTable").startSpan(); 211 try (Scope scope = span.makeCurrent()) { 212 if (admin.tableExists(tableName)) { 213 util.deleteTable(tableName); 214 } 215 } finally { 216 span.end(); 217 } 218 } 219 220 private LinkedBlockingQueue<Long> insertData() throws IOException, InterruptedException { 221 LinkedBlockingQueue<Long> rowKeys = new LinkedBlockingQueue<>(25000); 222 BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName); 223 Random rand = ThreadLocalRandom.current(); 224 byte[] value = new byte[300]; 225 for (int x = 0; x < 5000; x++) { 226 Span span = TraceUtil.getGlobalTracer().spanBuilder("insertData").startSpan(); 227 try (Scope scope = span.makeCurrent()) { 228 for (int i = 0; i < 5; i++) { 229 long rk = rand.nextLong(); 230 rowKeys.add(rk); 231 Put p = new Put(Bytes.toBytes(rk)); 232 for (int y = 0; y < 10; y++) { 233 Bytes.random(value); 234 p.addColumn(familyName, Bytes.toBytes(rand.nextLong()), value); 235 } 236 ht.mutate(p); 237 } 238 if ((x % 1000) == 0) { 239 admin.flush(tableName); 240 } 241 } finally { 242 span.end(); 243 } 244 } 245 admin.flush(tableName); 246 return rowKeys; 247 } 248 249 private IntegrationTestingUtility createUtil() throws Exception { 250 Configuration conf = getConf(); 251 if (this.util == null) { 252 IntegrationTestingUtility u; 253 if (conf == null) { 254 u = new IntegrationTestingUtility(); 255 } else { 256 u = new IntegrationTestingUtility(conf); 257 } 258 util = u; 259 util.initializeCluster(1); 260 261 } 262 return this.util; 263 } 264}