001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.NavigableSet;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicLong;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.DoNotRetryIOException;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.exceptions.ScannerResetException;
042import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner;
043import org.apache.hadoop.hbase.regionserver.HRegion;
044import org.apache.hadoop.hbase.regionserver.HStore;
045import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
046import org.apache.hadoop.hbase.regionserver.RegionServerServices;
047import org.apache.hadoop.hbase.regionserver.ReversedStoreScanner;
048import org.apache.hadoop.hbase.regionserver.ScanInfo;
049import org.apache.hadoop.hbase.regionserver.StoreScanner;
050import org.apache.hadoop.hbase.testclassification.ClientTests;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.wal.WAL;
054import org.junit.AfterClass;
055import org.junit.BeforeClass;
056import org.junit.ClassRule;
057import org.junit.Rule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.junit.rules.TestName;
061
062@Category({ MediumTests.class, ClientTests.class })
063public class TestFromClientSideScanExcpetion {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067    HBaseClassTestRule.forClass(TestFromClientSideScanExcpetion.class);
068
069  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
070
071  private static byte[] FAMILY = Bytes.toBytes("testFamily");
072
073  private static int SLAVES = 3;
074
075  @Rule
076  public TestName name = new TestName();
077
078  @BeforeClass
079  public static void setUpBeforeClass() throws Exception {
080    Configuration conf = TEST_UTIL.getConfiguration();
081    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 6000000);
082    conf.setClass(HConstants.REGION_IMPL, MyHRegion.class, HRegion.class);
083    conf.setBoolean("hbase.client.log.scanner.activity", true);
084    // We need more than one region server in this test
085    TEST_UTIL.startMiniCluster(SLAVES);
086  }
087
088  @AfterClass
089  public static void tearDownAfterClass() throws Exception {
090    TEST_UTIL.shutdownMiniCluster();
091  }
092
093  private static AtomicBoolean ON = new AtomicBoolean(false);
094  private static AtomicLong REQ_COUNT = new AtomicLong(0);
095  private static AtomicBoolean IS_DO_NOT_RETRY = new AtomicBoolean(false); // whether to throw
096                                                                           // DNRIOE
097  private static AtomicBoolean THROW_ONCE = new AtomicBoolean(true); // whether to only throw once
098
099  private static void reset() {
100    ON.set(false);
101    REQ_COUNT.set(0);
102    IS_DO_NOT_RETRY.set(false);
103    THROW_ONCE.set(true);
104  }
105
106  private static void inject() {
107    ON.set(true);
108  }
109
110  public static final class MyHRegion extends HRegion {
111
112    @SuppressWarnings("deprecation")
113    public MyHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
114      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
115      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
116    }
117
118    @Override
119    protected HStore instantiateHStore(ColumnFamilyDescriptor family, boolean warmup)
120      throws IOException {
121      return new MyHStore(this, family, conf, warmup);
122    }
123  }
124
125  public static final class MyHStore extends HStore {
126
127    public MyHStore(HRegion region, ColumnFamilyDescriptor family, Configuration confParam,
128      boolean warmup) throws IOException {
129      super(region, family, confParam, warmup);
130    }
131
132    @Override
133    protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
134      NavigableSet<byte[]> targetCols, long readPt) throws IOException {
135      return scan.isReversed()
136        ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt)
137        : new MyStoreScanner(this, scanInfo, scan, targetCols, readPt);
138    }
139  }
140
141  public static final class MyStoreScanner extends StoreScanner {
142    public MyStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
143      long readPt) throws IOException {
144      super(store, scanInfo, scan, columns, readPt);
145    }
146
147    @Override
148    protected List<KeyValueScanner> selectScannersFrom(HStore store,
149      List<? extends KeyValueScanner> allScanners) {
150      List<KeyValueScanner> scanners = super.selectScannersFrom(store, allScanners);
151      List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size());
152      for (KeyValueScanner scanner : scanners) {
153        newScanners.add(new DelegatingKeyValueScanner(scanner) {
154          @Override
155          public boolean reseek(Cell key) throws IOException {
156            if (ON.get()) {
157              REQ_COUNT.incrementAndGet();
158              if (!THROW_ONCE.get() || REQ_COUNT.get() == 1) {
159                if (IS_DO_NOT_RETRY.get()) {
160                  throw new DoNotRetryIOException("Injected exception");
161                } else {
162                  throw new IOException("Injected exception");
163                }
164              }
165            }
166            return super.reseek(key);
167          }
168        });
169      }
170      return newScanners;
171    }
172  }
173
174  /**
175   * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek leaving
176   * the server side RegionScanner to be in dirty state. The client has to ensure that the
177   * ClientScanner does not get an exception and also sees all the data.
178   */
179  @Test
180  public void testClientScannerIsResetWhenScanThrowsIOException()
181    throws IOException, InterruptedException {
182    reset();
183    THROW_ONCE.set(true); // throw exceptions only once
184    TableName tableName = TableName.valueOf(name.getMethodName());
185    try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
186      int rowCount = TEST_UTIL.loadTable(t, FAMILY, false);
187      TEST_UTIL.getAdmin().flush(tableName);
188      inject();
189      int actualRowCount = TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
190      assertEquals(rowCount, actualRowCount);
191    }
192    assertTrue(REQ_COUNT.get() > 0);
193  }
194
195  /**
196   * Tests the case where a coprocessor throws a DoNotRetryIOException in the scan. The expectation
197   * is that the exception will bubble up to the client scanner instead of being retried.
198   */
199  @Test
200  public void testScannerThrowsExceptionWhenCoprocessorThrowsDNRIOE()
201    throws IOException, InterruptedException {
202    reset();
203    IS_DO_NOT_RETRY.set(true);
204    TableName tableName = TableName.valueOf(name.getMethodName());
205    try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
206      TEST_UTIL.loadTable(t, FAMILY, false);
207      TEST_UTIL.getAdmin().flush(tableName);
208      inject();
209      TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
210      fail("Should have thrown an exception");
211    } catch (DoNotRetryIOException expected) {
212      // expected
213    }
214    assertTrue(REQ_COUNT.get() > 0);
215  }
216
217  /**
218   * Tests the case where a coprocessor throws a regular IOException in the scan. The expectation is
219   * that the we will keep on retrying, but fail after the retries are exhausted instead of retrying
220   * indefinitely.
221   */
222  @Test
223  public void testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE()
224    throws IOException, InterruptedException {
225    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
226    TableName tableName = TableName.valueOf(name.getMethodName());
227    reset();
228    THROW_ONCE.set(false); // throw exceptions in every retry
229    try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
230      TEST_UTIL.loadTable(t, FAMILY, false);
231      TEST_UTIL.getAdmin().flush(tableName);
232      inject();
233      TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
234      fail("Should have thrown an exception");
235    } catch (DoNotRetryIOException expected) {
236      assertThat(expected, instanceOf(ScannerResetException.class));
237      // expected
238    }
239    assertTrue(REQ_COUNT.get() >= 3);
240  }
241
242}