001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.example; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.Callable; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.Executors; 026import java.util.concurrent.Future; 027import java.util.concurrent.LinkedBlockingQueue; 028import java.util.concurrent.ThreadFactory; 029import java.util.concurrent.ThreadLocalRandom; 030import java.util.concurrent.ThreadPoolExecutor; 031import java.util.concurrent.TimeUnit; 032import org.apache.hadoop.conf.Configured; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellBuilderFactory; 035import org.apache.hadoop.hbase.CellBuilderType; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.ConnectionFactory; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.RegionLocator; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.ResultScanner; 043import org.apache.hadoop.hbase.client.Scan; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.filter.KeyOnlyFilter; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.util.Tool; 048import org.apache.hadoop.util.ToolRunner; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 054 055/** 056 * Example on how to use HBase's {@link Connection} and {@link Table} in a multi-threaded 057 * environment. Each table is a light weight object that is created and thrown away. Connections are 058 * heavy weight objects that hold on to zookeeper connections, async processes, and other state. 059 * 060 * <pre> 061 * Usage: 062 * bin/hbase org.apache.hadoop.hbase.client.example.MultiThreadedClientExample testTableName 500000 063 * </pre> 064 * <p> 065 * The table should already be created before running the command. This example expects one column 066 * family named d. 067 * </p> 068 * <p> 069 * This is meant to show different operations that are likely to be done in a real world 070 * application. These operations are: 071 * </p> 072 * <ul> 073 * <li>30% of all operations performed are batch writes. 30 puts are created and sent out at a time. 074 * The response for all puts is waited on.</li> 075 * <li>20% of all operations are single writes. A single put is sent out and the response is waited 076 * for.</li> 077 * <li>50% of all operations are scans. These scans start at a random place and scan up to 100 rows. 078 * </li> 079 * </ul> 080 */ 081@InterfaceAudience.Private 082public class MultiThreadedClientExample extends Configured implements Tool { 083 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedClientExample.class); 084 private static final int DEFAULT_NUM_OPERATIONS = 500000; 085 086 /** 087 * The name of the column family. d for default. 088 */ 089 private static final byte[] FAMILY = Bytes.toBytes("d"); 090 091 /** 092 * For the example we're just using one qualifier. 093 */ 094 private static final byte[] QUAL = Bytes.toBytes("test"); 095 096 private final ExecutorService internalPool; 097 098 private final int threads; 099 100 public MultiThreadedClientExample() throws IOException { 101 // Base number of threads. 102 // This represents the number of threads you application has 103 // that can be interacting with an hbase client. 104 this.threads = Runtime.getRuntime().availableProcessors() * 4; 105 106 // Daemon threads are great for things that get shut down. 107 ThreadFactory threadFactory = 108 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("internal-pol-%d").build(); 109 110 this.internalPool = Executors.newFixedThreadPool(threads, threadFactory); 111 } 112 113 @Override 114 public int run(String[] args) throws Exception { 115 116 if (args.length < 1 || args.length > 2) { 117 System.out.println("Usage: " + this.getClass().getName() + " tableName [num_operations]"); 118 return -1; 119 } 120 121 final TableName tableName = TableName.valueOf(args[0]); 122 int numOperations = DEFAULT_NUM_OPERATIONS; 123 124 // the second arg is the number of operations to send. 125 if (args.length == 2) { 126 numOperations = Integer.parseInt(args[1]); 127 } 128 129 // Threads for the client only. 130 // 131 // We don't want to mix hbase and business logic. 132 // 133 ThreadPoolExecutor service = new ThreadPoolExecutor(threads * 2, threads * 2, 60L, 134 TimeUnit.SECONDS, new LinkedBlockingQueue<>()); 135 136 // Create two different connections showing how it's possible to 137 // separate different types of requests onto different connections 138 final Connection writeConnection = ConnectionFactory.createConnection(getConf(), service); 139 final Connection readConnection = ConnectionFactory.createConnection(getConf(), service); 140 141 // At this point the entire cache for the region locations is full. 142 // Only do this if the number of regions in a table is easy to fit into memory. 143 // 144 // If you are interacting with more than 25k regions on a client then it's probably not good 145 // to do this at all. 146 warmUpConnectionCache(readConnection, tableName); 147 warmUpConnectionCache(writeConnection, tableName); 148 149 List<Future<Boolean>> futures = new ArrayList<>(numOperations); 150 for (int i = 0; i < numOperations; i++) { 151 double r = ThreadLocalRandom.current().nextDouble(); 152 Future<Boolean> f; 153 154 // For the sake of generating some synthetic load this queues 155 // some different callables. 156 // These callables are meant to represent real work done by your application. 157 if (r < .30) { 158 f = internalPool.submit(new WriteExampleCallable(writeConnection, tableName)); 159 } else if (r < .50) { 160 f = internalPool.submit(new SingleWriteExampleCallable(writeConnection, tableName)); 161 } else { 162 f = internalPool.submit(new ReadExampleCallable(writeConnection, tableName)); 163 } 164 futures.add(f); 165 } 166 167 // Wait a long time for all the reads/writes to complete 168 for (Future<Boolean> f : futures) { 169 f.get(10, TimeUnit.MINUTES); 170 } 171 172 // Clean up after our selves for cleanliness 173 internalPool.shutdownNow(); 174 service.shutdownNow(); 175 return 0; 176 } 177 178 private void warmUpConnectionCache(Connection connection, TableName tn) throws IOException { 179 try (RegionLocator locator = connection.getRegionLocator(tn)) { 180 LOG.info("Warmed up region location cache for " + tn + " got " 181 + locator.getAllRegionLocations().size()); 182 } 183 } 184 185 /** 186 * Class that will show how to send batches of puts at the same time. 187 */ 188 public static class WriteExampleCallable implements Callable<Boolean> { 189 private final Connection connection; 190 private final TableName tableName; 191 192 public WriteExampleCallable(Connection connection, TableName tableName) { 193 this.connection = connection; 194 this.tableName = tableName; 195 } 196 197 @Override 198 public Boolean call() throws Exception { 199 200 // Table implements Closable so we use the try with resource structure here. 201 // https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html 202 try (Table t = connection.getTable(tableName)) { 203 byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble())); 204 int rows = 30; 205 206 // Array to put the batch 207 ArrayList<Put> puts = new ArrayList<>(rows); 208 for (int i = 0; i < 30; i++) { 209 byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); 210 Put p = new Put(rk); 211 p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(rk).setFamily(FAMILY) 212 .setQualifier(QUAL).setTimestamp(p.getTimestamp()).setType(Cell.Type.Put) 213 .setValue(value).build()); 214 puts.add(p); 215 } 216 217 // now that we've assembled the batch it's time to push it to hbase. 218 t.put(puts); 219 } 220 return true; 221 } 222 } 223 224 /** 225 * Class to show how to send a single put. 226 */ 227 public static class SingleWriteExampleCallable implements Callable<Boolean> { 228 private final Connection connection; 229 private final TableName tableName; 230 231 public SingleWriteExampleCallable(Connection connection, TableName tableName) { 232 this.connection = connection; 233 this.tableName = tableName; 234 } 235 236 @Override 237 public Boolean call() throws Exception { 238 try (Table t = connection.getTable(tableName)) { 239 240 byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble())); 241 byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); 242 Put p = new Put(rk); 243 p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(rk).setFamily(FAMILY) 244 .setQualifier(QUAL).setTimestamp(p.getTimestamp()).setType(Cell.Type.Put).setValue(value) 245 .build()); 246 t.put(p); 247 } 248 return true; 249 } 250 } 251 252 /** 253 * Class to show how to scan some rows starting at a random location. 254 */ 255 public static class ReadExampleCallable implements Callable<Boolean> { 256 private final Connection connection; 257 private final TableName tableName; 258 259 public ReadExampleCallable(Connection connection, TableName tableName) { 260 this.connection = connection; 261 this.tableName = tableName; 262 } 263 264 @Override 265 public Boolean call() throws Exception { 266 267 // total length in bytes of all read rows. 268 int result = 0; 269 270 // Number of rows the scan will read before being considered done. 271 int toRead = 100; 272 try (Table t = connection.getTable(tableName)) { 273 byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); 274 Scan s = new Scan(rk); 275 276 // This filter will keep the values from being sent accross the wire. 277 // This is good for counting or other scans that are checking for 278 // existence and don't rely on the value. 279 s.setFilter(new KeyOnlyFilter()); 280 281 // Don't go back to the server for every single row. 282 // We know these rows are small. So ask for 20 at a time. 283 // This would be application specific. 284 // 285 // The goal is to reduce round trips but asking for too 286 // many rows can lead to GC problems on client and server sides. 287 s.setCaching(20); 288 289 // Don't use the cache. While this is a silly test program it's still good to be 290 // explicit that scans normally don't use the block cache. 291 s.setCacheBlocks(false); 292 293 // Open up the scanner and close it automatically when done. 294 try (ResultScanner rs = t.getScanner(s)) { 295 296 // Now go through rows. 297 for (Result r : rs) { 298 // Keep track of things size to simulate doing some real work. 299 result += r.getRow().length; 300 toRead -= 1; 301 302 // Most online applications won't be 303 // reading the entire table so this break 304 // simulates small to medium size scans, 305 // without needing to know an end row. 306 if (toRead <= 0) { 307 break; 308 } 309 } 310 } 311 } 312 return result > 0; 313 } 314 } 315 316 public static void main(String[] args) throws Exception { 317 ToolRunner.run(new MultiThreadedClientExample(), args); 318 } 319}