001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Optional; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.TableNameTestRule; 034import org.apache.hadoop.hbase.client.Durability; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.Result; 037import org.apache.hadoop.hbase.client.ResultScanner; 038import org.apache.hadoop.hbase.client.Scan; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 041import org.apache.hadoop.hbase.coprocessor.ObserverContext; 042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 044import org.apache.hadoop.hbase.coprocessor.RegionObserver; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.testclassification.RegionServerTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.junit.AfterClass; 049import org.junit.BeforeClass; 050import org.junit.ClassRule; 051import org.junit.Rule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057@Category({ RegionServerTests.class, MediumTests.class }) 058public class TestScannerRetriableFailure { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestScannerRetriableFailure.class); 063 064 private static final Logger LOG = LoggerFactory.getLogger(TestScannerRetriableFailure.class); 065 066 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 067 068 private static final String FAMILY_NAME_STR = "f"; 069 private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR); 070 071 @Rule 072 public TableNameTestRule testTable = new TableNameTestRule(); 073 074 public static class FaultyScannerObserver implements RegionCoprocessor, RegionObserver { 075 private int faults = 0; 076 077 @Override 078 public Optional<RegionObserver> getRegionObserver() { 079 return Optional.of(this); 080 } 081 082 @Override 083 public boolean preScannerNext(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 084 final InternalScanner s, final List<Result> results, final int limit, final boolean hasMore) 085 throws IOException { 086 final TableName tableName = e.getEnvironment().getRegionInfo().getTable(); 087 if (!tableName.isSystemTable() && (faults++ % 2) == 0) { 088 LOG.debug(" Injecting fault in table=" + tableName + " scanner"); 089 throw new IOException("injected fault"); 090 } 091 return hasMore; 092 } 093 } 094 095 private static void setupConf(Configuration conf) { 096 conf.setLong("hbase.hstore.compaction.min", 20); 097 conf.setLong("hbase.hstore.compaction.max", 39); 098 conf.setLong("hbase.hstore.blockingStoreFiles", 40); 099 100 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, FaultyScannerObserver.class.getName()); 101 } 102 103 @BeforeClass 104 public static void setup() throws Exception { 105 setupConf(UTIL.getConfiguration()); 106 UTIL.startMiniCluster(1); 107 } 108 109 @AfterClass 110 public static void tearDown() throws Exception { 111 try { 112 UTIL.shutdownMiniCluster(); 113 } catch (Exception e) { 114 LOG.warn("failure shutting down cluster", e); 115 } 116 } 117 118 @Test 119 public void testFaultyScanner() throws Exception { 120 TableName tableName = testTable.getTableName(); 121 Table table = UTIL.createTable(tableName, FAMILY_NAME); 122 try { 123 final int NUM_ROWS = 100; 124 loadTable(table, NUM_ROWS); 125 checkTableRows(table, NUM_ROWS); 126 } finally { 127 table.close(); 128 } 129 } 130 131 // ========================================================================== 132 // Helpers 133 // ========================================================================== 134 private FileSystem getFileSystem() { 135 return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem(); 136 } 137 138 private Path getRootDir() { 139 return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); 140 } 141 142 public void loadTable(final Table table, int numRows) throws IOException { 143 List<Put> puts = new ArrayList<>(numRows); 144 for (int i = 0; i < numRows; ++i) { 145 byte[] row = Bytes.toBytes(String.format("%09d", i)); 146 Put put = new Put(row); 147 put.setDurability(Durability.SKIP_WAL); 148 put.addColumn(FAMILY_NAME, null, row); 149 table.put(put); 150 } 151 } 152 153 private void checkTableRows(final Table table, int numRows) throws Exception { 154 Scan scan = new Scan(); 155 scan.setCaching(1); 156 scan.setCacheBlocks(false); 157 ResultScanner scanner = table.getScanner(scan); 158 try { 159 int count = 0; 160 for (int i = 0; i < numRows; ++i) { 161 byte[] row = Bytes.toBytes(String.format("%09d", i)); 162 Result result = scanner.next(); 163 assertTrue(result != null); 164 assertTrue(Bytes.equals(row, result.getRow())); 165 count++; 166 } 167 168 while (true) { 169 Result result = scanner.next(); 170 if (result == null) break; 171 count++; 172 } 173 assertEquals(numRows, count); 174 } finally { 175 scanner.close(); 176 } 177 } 178}