001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayDeque; 023import java.util.Queue; 024import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 025import org.apache.yetus.audience.InterfaceAudience; 026 027import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 028 029/** 030 * A ResultScanner which will only send request to RS when there are no cached results when calling 031 * next, just like the ResultScanner in the old time. Mainly used for writing UTs, that we can 032 * control when to send request to RS. The default ResultScanner implementation will fetch in 033 * background. 034 */ 035@InterfaceAudience.Private 036public class ScanPerNextResultScanner implements ResultScanner, AdvancedScanResultConsumer { 037 038 private final AsyncTable<AdvancedScanResultConsumer> table; 039 040 private final Scan scan; 041 042 private final Queue<Result> queue = new ArrayDeque<>(); 043 044 private ScanMetrics scanMetrics; 045 046 private boolean closed = false; 047 048 private Throwable error; 049 050 private ScanResumer resumer; 051 052 public ScanPerNextResultScanner(AsyncTable<AdvancedScanResultConsumer> table, Scan scan) { 053 this.table = table; 054 this.scan = scan; 055 } 056 057 @Override 058 public synchronized void onError(Throwable error) { 059 this.error = error; 060 notifyAll(); 061 } 062 063 @Override 064 public synchronized void onComplete() { 065 closed = true; 066 notifyAll(); 067 } 068 069 @Override 070 public void onScanMetricsCreated(ScanMetrics scanMetrics) { 071 this.scanMetrics = scanMetrics; 072 } 073 074 @Override 075 public synchronized void onNext(Result[] results, ScanController controller) { 076 assert results.length > 0; 077 if (closed) { 078 controller.terminate(); 079 return; 080 } 081 for (Result result : results) { 082 queue.add(result); 083 } 084 notifyAll(); 085 resumer = controller.suspend(); 086 } 087 088 @Override 089 public synchronized void onHeartbeat(ScanController controller) { 090 if (closed) { 091 controller.terminate(); 092 return; 093 } 094 if (scan.isNeedCursorResult()) { 095 controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c))); 096 } 097 } 098 099 @Override 100 public synchronized Result next() throws IOException { 101 if (queue.isEmpty()) { 102 if (resumer != null) { 103 resumer.resume(); 104 resumer = null; 105 } else { 106 table.scan(scan, this); 107 } 108 } 109 while (queue.isEmpty()) { 110 if (closed) { 111 return null; 112 } 113 if (error != null) { 114 Throwables.propagateIfPossible(error, IOException.class); 115 throw new IOException(error); 116 } 117 try { 118 wait(); 119 } catch (InterruptedException e) { 120 throw new InterruptedIOException(); 121 } 122 } 123 return queue.poll(); 124 } 125 126 @Override 127 public synchronized void close() { 128 closed = true; 129 queue.clear(); 130 if (resumer != null) { 131 resumer.resume(); 132 resumer = null; 133 } 134 notifyAll(); 135 } 136 137 @Override 138 public boolean renewLease() { 139 // The renew lease operation will be handled in background 140 return false; 141 } 142 143 @Override 144 public ScanMetrics getScanMetrics() { 145 return scanMetrics; 146 } 147}