001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertNotNull;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.io.InputStream;
026import java.lang.ref.SoftReference;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.List;
030import org.apache.hadoop.fs.FSDataInputStream;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.FilterFileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.PositionedReadable;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.KeyValue;
039import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.client.TableDescriptor;
045import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
046import org.apache.hadoop.hbase.fs.HFileSystem;
047import org.apache.hadoop.hbase.io.hfile.CacheConfig;
048import org.apache.hadoop.hbase.io.hfile.HFileContext;
049import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
050import org.apache.hadoop.hbase.io.hfile.HFileScanner;
051import org.apache.hadoop.hbase.testclassification.LargeTests;
052import org.apache.hadoop.hbase.testclassification.RegionServerTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.junit.Assume;
055import org.junit.ClassRule;
056import org.junit.Rule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.junit.rules.TestName;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063/**
064 * Test cases that ensure that file system level errors are bubbled up appropriately to clients,
065 * rather than swallowed.
066 */
067@Category({ RegionServerTests.class, LargeTests.class })
068public class TestFSErrorsExposed {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072    HBaseClassTestRule.forClass(TestFSErrorsExposed.class);
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestFSErrorsExposed.class);
075
076  HBaseTestingUtil util = new HBaseTestingUtil();
077
078  @Rule
079  public TestName name = new TestName();
080
081  /**
082   * Injects errors into the pread calls of an on-disk file, and makes sure those bubble up to the
083   * HFile scanner
084   */
085  @Test
086  public void testHFileScannerThrowsErrors() throws IOException {
087    Path hfilePath = new Path(
088      new Path(util.getDataTestDir("internalScannerExposesErrors"), "regionname"), "familyname");
089    HFileSystem hfs = (HFileSystem) util.getTestFileSystem();
090    FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs());
091    FileSystem fs = new HFileSystem(faultyfs);
092    CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
093    HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
094    StoreFileWriter writer = new StoreFileWriter.Builder(util.getConfiguration(), cacheConf, hfs)
095      .withOutputDir(hfilePath).withFileContext(meta).build();
096    TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
097
098    StoreFileInfo storeFileInfo = StoreFileInfo.createStoreFileInfoForHFile(util.getConfiguration(),
099      fs, writer.getPath(), true);
100    HStoreFile sf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf);
101    sf.initReader();
102    StoreFileReader reader = sf.getReader();
103    try (HFileScanner scanner = reader.getScanner(false, true, false)) {
104      FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
105      assertNotNull(inStream);
106
107      scanner.seekTo();
108      // Do at least one successful read
109      assertTrue(scanner.next());
110
111      faultyfs.startFaults();
112
113      try {
114        int scanned = 0;
115        while (scanner.next()) {
116          scanned++;
117        }
118        fail("Scanner didn't throw after faults injected");
119      } catch (IOException ioe) {
120        LOG.info("Got expected exception", ioe);
121        assertTrue(ioe.getMessage().contains("Fault"));
122      }
123    }
124    reader.close(true); // end of test so evictOnClose
125  }
126
127  /**
128   * Injects errors into the pread calls of an on-disk file, and makes sure those bubble up to the
129   * StoreFileScanner
130   */
131  @Test
132  public void testStoreFileScannerThrowsErrors() throws IOException {
133    Path hfilePath = new Path(
134      new Path(util.getDataTestDir("internalScannerExposesErrors"), "regionname"), "familyname");
135    HFileSystem hfs = (HFileSystem) util.getTestFileSystem();
136    FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs());
137    HFileSystem fs = new HFileSystem(faultyfs);
138    CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
139    HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
140    StoreFileWriter writer = new StoreFileWriter.Builder(util.getConfiguration(), cacheConf, hfs)
141      .withOutputDir(hfilePath).withFileContext(meta).build();
142    TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
143
144    StoreFileInfo storeFileInfo = StoreFileInfo.createStoreFileInfoForHFile(util.getConfiguration(),
145      fs, writer.getPath(), true);
146    HStoreFile sf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf);
147
148    List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
149      Collections.singletonList(sf), false, true, false, false,
150      // 0 is passed as readpoint because this test operates on HStoreFile directly
151      0);
152    try {
153      KeyValueScanner scanner = scanners.get(0);
154
155      FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
156      assertNotNull(inStream);
157
158      scanner.seek(KeyValue.LOWESTKEY);
159      // Do at least one successful read
160      assertNotNull(scanner.next());
161      faultyfs.startFaults();
162
163      try {
164        int scanned = 0;
165        while (scanner.next() != null) {
166          scanned++;
167        }
168        fail("Scanner didn't throw after faults injected");
169      } catch (IOException ioe) {
170        LOG.info("Got expected exception", ioe);
171        assertTrue(ioe.getMessage().contains("Could not iterate"));
172      }
173    } finally {
174      for (StoreFileScanner scanner : scanners) {
175        scanner.close();
176      }
177    }
178  }
179
180  /**
181   * Cluster test which starts a region server with a region, then removes the data from HDFS
182   * underneath it, and ensures that errors are bubbled to the client.
183   */
184  @Test
185  public void testFullSystemBubblesFSErrors() throws Exception {
186    // We won't have an error if the datanode is not there if we use short circuit
187    // it's a known 'feature'.
188    Assume.assumeTrue(!util.isReadShortCircuitOn());
189
190    try {
191      // Make it fail faster.
192      util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
193      util.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 90000);
194      util.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000);
195      util.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000);
196      util.startMiniCluster(1);
197      final TableName tableName = TableName.valueOf(name.getMethodName());
198      byte[] fam = Bytes.toBytes("fam");
199
200      Admin admin = util.getAdmin();
201      TableDescriptor tableDescriptor =
202        TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
203          .newBuilder(fam).setMaxVersions(1).setBlockCacheEnabled(false).build()).build();
204      admin.createTable(tableDescriptor);
205
206      // Make a new Configuration so it makes a new connection that has the
207      // above configuration on it; else we use the old one w/ 10 as default.
208      try (Table table = util.getConnection().getTable(tableName)) {
209        // Load some data
210        util.loadTable(table, fam, false);
211        util.flush();
212        HBaseTestingUtil.countRows(table);
213
214        // Kill the DFS cluster
215        util.getDFSCluster().shutdownDataNodes();
216
217        try {
218          HBaseTestingUtil.countRows(table);
219          fail("Did not fail to count after removing data");
220        } catch (Exception e) {
221          LOG.info("Got expected error", e);
222          assertTrue(e.getMessage().contains("Could not seek"));
223        }
224      }
225
226      // Restart data nodes so that HBase can shut down cleanly.
227      util.getDFSCluster().restartDataNodes();
228
229    } finally {
230      SingleProcessHBaseCluster cluster = util.getMiniHBaseCluster();
231      if (cluster != null) cluster.killAll();
232      util.shutdownMiniCluster();
233    }
234  }
235
236  static class FaultyFileSystem extends FilterFileSystem {
237    List<SoftReference<FaultyInputStream>> inStreams = new ArrayList<>();
238
239    public FaultyFileSystem(FileSystem testFileSystem) {
240      super(testFileSystem);
241    }
242
243    @Override
244    public FSDataInputStream open(Path p, int bufferSize) throws IOException {
245      FSDataInputStream orig = fs.open(p, bufferSize);
246      FaultyInputStream faulty = new FaultyInputStream(orig);
247      inStreams.add(new SoftReference<>(faulty));
248      return faulty;
249    }
250
251    /**
252     * Starts to simulate faults on all streams opened so far
253     */
254    public void startFaults() {
255      for (SoftReference<FaultyInputStream> is : inStreams) {
256        is.get().startFaults();
257      }
258    }
259  }
260
261  static class FaultyInputStream extends FSDataInputStream {
262    boolean faultsStarted = false;
263
264    public FaultyInputStream(InputStream in) throws IOException {
265      super(in);
266    }
267
268    public void startFaults() {
269      faultsStarted = true;
270    }
271
272    @Override
273    public int read(long position, byte[] buffer, int offset, int length) throws IOException {
274      injectFault();
275      return ((PositionedReadable) in).read(position, buffer, offset, length);
276    }
277
278    private void injectFault() throws IOException {
279      if (faultsStarted) {
280        throw new IOException("Fault injected");
281      }
282    }
283  }
284}