001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
021
022import io.opentelemetry.context.Context;
023import io.opentelemetry.context.Scope;
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.Map;
027import java.util.Queue;
028import java.util.concurrent.ConcurrentLinkedQueue;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.atomic.AtomicLong;
032import java.util.concurrent.locks.Condition;
033import java.util.concurrent.locks.Lock;
034import java.util.concurrent.locks.ReentrantLock;
035import java.util.function.Consumer;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
039import org.apache.hadoop.hbase.util.Threads;
040import org.apache.yetus.audience.InterfaceAudience;
041
042/**
043 * ClientAsyncPrefetchScanner implements async scanner behaviour. Specifically, the cache used by
044 * this scanner is a concurrent queue which allows both the producer (hbase client) and consumer
045 * (application) to access the queue in parallel. The number of rows returned in a prefetch is
046 * defined by the caching factor and the result size factor. This class allocates a buffer cache,
047 * whose size is a function of both factors. The prefetch is invoked when the cache is half-filled,
048 * instead of waiting for it to be empty. This is defined in the method
049 * {@link ClientAsyncPrefetchScanner#prefetchCondition()}.
050 */
051@InterfaceAudience.Private
052public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
053
054  private long maxCacheSize;
055  private AtomicLong cacheSizeInBytes;
056  // exception queue (from prefetch to main scan execution)
057  private final Queue<Exception> exceptionsQueue;
058  // used for testing
059  private Consumer<Boolean> prefetchListener;
060
061  private final Lock lock = new ReentrantLock();
062  private final Condition notEmpty = lock.newCondition();
063  private final Condition notFull = lock.newCondition();
064
065  public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, Scan scanForMetrics,
066    TableName name, ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
067    RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout,
068    int scannerTimeout, int replicaCallTimeoutMicroSecondScan,
069    ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
070    throws IOException {
071    super(configuration, scan, scanForMetrics, name, connection, rpcCallerFactory,
072      rpcControllerFactory, pool, scanReadRpcTimeout, scannerTimeout,
073      replicaCallTimeoutMicroSecondScan, connectionConfiguration, requestAttributes);
074    exceptionsQueue = new ConcurrentLinkedQueue<>();
075    final Context context = Context.current();
076    final Runnable runnable = context.wrap(new PrefetchRunnable());
077    Threads.setDaemonThreadRunning(new Thread(runnable), name + ".asyncPrefetcher");
078  }
079
080  void setPrefetchListener(Consumer<Boolean> prefetchListener) {
081    this.prefetchListener = prefetchListener;
082  }
083
084  @Override
085  protected void initCache() {
086    // Override to put a different cache in place of the super's -- a concurrent one.
087    cache = new LinkedBlockingQueue<>();
088    maxCacheSize = resultSize2CacheSize(maxScannerResultSize);
089    cacheSizeInBytes = new AtomicLong(0);
090  }
091
092  private long resultSize2CacheSize(long maxResultSize) {
093    // * 2 if possible
094    return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
095  }
096
097  @Override
098  public Result next() throws IOException {
099    try (Scope ignored = span.makeCurrent()) {
100      lock.lock();
101      try {
102        while (cache.isEmpty()) {
103          handleException();
104          if (this.closed) {
105            return null;
106          }
107          try {
108            notEmpty.await();
109          } catch (InterruptedException e) {
110            span.recordException(e);
111            throw new InterruptedIOException("Interrupted when wait to load cache");
112          }
113        }
114        Result result = pollCache();
115        if (prefetchCondition()) {
116          notFull.signalAll();
117        }
118        return result;
119      } finally {
120        lock.unlock();
121        handleException();
122      }
123    }
124  }
125
126  @Override
127  public void close() {
128    lock.lock();
129    try {
130      super.close();
131      closed = true;
132      notFull.signalAll();
133      notEmpty.signalAll();
134    } finally {
135      lock.unlock();
136    }
137  }
138
139  @Override
140  protected void addEstimatedSize(long estimatedSize) {
141    cacheSizeInBytes.addAndGet(estimatedSize);
142  }
143
144  private void handleException() throws IOException {
145    // The prefetch task running in the background puts any exception it
146    // catches into this exception queue.
147    // Rethrow the exception so the application can handle it.
148    while (!exceptionsQueue.isEmpty()) {
149      Exception first = exceptionsQueue.peek();
150      first.printStackTrace();
151      if (first instanceof IOException) {
152        throw (IOException) first;
153      }
154      throw (RuntimeException) first;
155    }
156  }
157
158  private boolean prefetchCondition() {
159    return cacheSizeInBytes.get() < maxCacheSize / 2;
160  }
161
162  private Result pollCache() {
163    Result res = cache.poll();
164    long estimatedSize = calcEstimatedSize(res);
165    addEstimatedSize(-estimatedSize);
166    return res;
167  }
168
169  private class PrefetchRunnable implements Runnable {
170
171    @Override
172    public void run() {
173      while (!closed) {
174        boolean succeed = false;
175        try {
176          lock.lock();
177          while (!prefetchCondition()) {
178            notFull.await();
179          }
180          loadCache();
181          succeed = true;
182        } catch (Exception e) {
183          exceptionsQueue.add(e);
184          span.recordException(e);
185        } finally {
186          notEmpty.signalAll();
187          lock.unlock();
188          if (prefetchListener != null) {
189            prefetchListener.accept(succeed);
190          }
191        }
192      }
193    }
194  }
195
196}