001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; 021 022import io.opentelemetry.context.Context; 023import io.opentelemetry.context.Scope; 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.Map; 027import java.util.Queue; 028import java.util.concurrent.ConcurrentLinkedQueue; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.atomic.AtomicLong; 032import java.util.concurrent.locks.Condition; 033import java.util.concurrent.locks.Lock; 034import java.util.concurrent.locks.ReentrantLock; 035import java.util.function.Consumer; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 039import org.apache.hadoop.hbase.util.Threads; 040import org.apache.yetus.audience.InterfaceAudience; 041 042/** 043 * ClientAsyncPrefetchScanner implements async scanner behaviour. Specifically, the cache used by 044 * this scanner is a concurrent queue which allows both the producer (hbase client) and consumer 045 * (application) to access the queue in parallel. The number of rows returned in a prefetch is 046 * defined by the caching factor and the result size factor. This class allocates a buffer cache, 047 * whose size is a function of both factors. The prefetch is invoked when the cache is half-filled, 048 * instead of waiting for it to be empty. This is defined in the method 049 * {@link ClientAsyncPrefetchScanner#prefetchCondition()}. 050 */ 051@InterfaceAudience.Private 052public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { 053 054 private long maxCacheSize; 055 private AtomicLong cacheSizeInBytes; 056 // exception queue (from prefetch to main scan execution) 057 private final Queue<Exception> exceptionsQueue; 058 // used for testing 059 private Consumer<Boolean> prefetchListener; 060 061 private final Lock lock = new ReentrantLock(); 062 private final Condition notEmpty = lock.newCondition(); 063 private final Condition notFull = lock.newCondition(); 064 065 public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, Scan scanForMetrics, 066 TableName name, ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, 067 RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout, 068 int scannerTimeout, int replicaCallTimeoutMicroSecondScan, 069 ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes) 070 throws IOException { 071 super(configuration, scan, scanForMetrics, name, connection, rpcCallerFactory, 072 rpcControllerFactory, pool, scanReadRpcTimeout, scannerTimeout, 073 replicaCallTimeoutMicroSecondScan, connectionConfiguration, requestAttributes); 074 exceptionsQueue = new ConcurrentLinkedQueue<>(); 075 final Context context = Context.current(); 076 final Runnable runnable = context.wrap(new PrefetchRunnable()); 077 Threads.setDaemonThreadRunning(new Thread(runnable), name + ".asyncPrefetcher"); 078 } 079 080 void setPrefetchListener(Consumer<Boolean> prefetchListener) { 081 this.prefetchListener = prefetchListener; 082 } 083 084 @Override 085 protected void initCache() { 086 // Override to put a different cache in place of the super's -- a concurrent one. 087 cache = new LinkedBlockingQueue<>(); 088 maxCacheSize = resultSize2CacheSize(maxScannerResultSize); 089 cacheSizeInBytes = new AtomicLong(0); 090 } 091 092 private long resultSize2CacheSize(long maxResultSize) { 093 // * 2 if possible 094 return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; 095 } 096 097 @Override 098 public Result next() throws IOException { 099 try (Scope ignored = span.makeCurrent()) { 100 lock.lock(); 101 try { 102 while (cache.isEmpty()) { 103 handleException(); 104 if (this.closed) { 105 return null; 106 } 107 try { 108 notEmpty.await(); 109 } catch (InterruptedException e) { 110 span.recordException(e); 111 throw new InterruptedIOException("Interrupted when wait to load cache"); 112 } 113 } 114 Result result = pollCache(); 115 if (prefetchCondition()) { 116 notFull.signalAll(); 117 } 118 return result; 119 } finally { 120 lock.unlock(); 121 handleException(); 122 } 123 } 124 } 125 126 @Override 127 public void close() { 128 lock.lock(); 129 try { 130 super.close(); 131 closed = true; 132 notFull.signalAll(); 133 notEmpty.signalAll(); 134 } finally { 135 lock.unlock(); 136 } 137 } 138 139 @Override 140 protected void addEstimatedSize(long estimatedSize) { 141 cacheSizeInBytes.addAndGet(estimatedSize); 142 } 143 144 private void handleException() throws IOException { 145 // The prefetch task running in the background puts any exception it 146 // catches into this exception queue. 147 // Rethrow the exception so the application can handle it. 148 while (!exceptionsQueue.isEmpty()) { 149 Exception first = exceptionsQueue.peek(); 150 first.printStackTrace(); 151 if (first instanceof IOException) { 152 throw (IOException) first; 153 } 154 throw (RuntimeException) first; 155 } 156 } 157 158 private boolean prefetchCondition() { 159 return cacheSizeInBytes.get() < maxCacheSize / 2; 160 } 161 162 private Result pollCache() { 163 Result res = cache.poll(); 164 long estimatedSize = calcEstimatedSize(res); 165 addEstimatedSize(-estimatedSize); 166 return res; 167 } 168 169 private class PrefetchRunnable implements Runnable { 170 171 @Override 172 public void run() { 173 while (!closed) { 174 boolean succeed = false; 175 try { 176 lock.lock(); 177 while (!prefetchCondition()) { 178 notFull.await(); 179 } 180 loadCache(); 181 succeed = true; 182 } catch (Exception e) { 183 exceptionsQueue.add(e); 184 span.recordException(e); 185 } finally { 186 notEmpty.signalAll(); 187 lock.unlock(); 188 if (prefetchListener != null) { 189 prefetchListener.accept(succeed); 190 } 191 } 192 } 193 } 194 } 195 196}