001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.KeyValueTestUtil.create; 021import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.NavigableSet; 029import java.util.Random; 030import java.util.TreeSet; 031import java.util.concurrent.CountDownLatch; 032import java.util.concurrent.ThreadLocalRandom; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.CellComparator; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.KeepDeletedCells; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.client.RegionInfoBuilder; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.TableDescriptor; 050import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 051import org.apache.hadoop.hbase.io.hfile.CacheConfig; 052import org.apache.hadoop.hbase.io.hfile.HFileContext; 053import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 054import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; 055import org.apache.hadoop.hbase.testclassification.RegionServerTests; 056import org.apache.hadoop.hbase.testclassification.SmallTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.junit.BeforeClass; 059import org.junit.ClassRule; 060import org.junit.Rule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.junit.rules.TestName; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067/** 068 * This test tests whether parallel {@link StoreScanner#close()} and 069 * {@link StoreScanner#updateReaders(List, List)} works perfectly ensuring that there are no 070 * references on the existing Storescanner readers. 071 */ 072@Category({ RegionServerTests.class, SmallTests.class }) 073public class TestStoreScannerClosure { 074 075 @ClassRule 076 public static final HBaseClassTestRule CLASS_RULE = 077 HBaseClassTestRule.forClass(TestStoreScannerClosure.class); 078 079 private static final Logger LOG = LoggerFactory.getLogger(TestStoreScannerClosure.class); 080 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 081 @Rule 082 public TestName name = new TestName(); 083 private static final String CF_STR = "cf"; 084 private static HRegion region; 085 private static final byte[] CF = Bytes.toBytes(CF_STR); 086 static Configuration CONF = HBaseConfiguration.create(); 087 private static CacheConfig cacheConf; 088 private static FileSystem fs; 089 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 090 private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE, 091 KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false); 092 private final static byte[] fam = Bytes.toBytes("cf_1"); 093 private static final KeyValue[] kvs = 094 new KeyValue[] { create("R1", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), 095 create("R1", "cf", "b", 11, KeyValue.Type.Put, "dont-care"), 096 create("R1", "cf", "c", 11, KeyValue.Type.Put, "dont-care"), 097 create("R1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"), 098 create("R1", "cf", "e", 11, KeyValue.Type.Put, "dont-care"), 099 create("R1", "cf", "f", 11, KeyValue.Type.Put, "dont-care"), 100 create("R1", "cf", "g", 11, KeyValue.Type.Put, "dont-care"), 101 create("R1", "cf", "h", 11, KeyValue.Type.Put, "dont-care"), 102 create("R1", "cf", "i", 11, KeyValue.Type.Put, "dont-care"), 103 create("R2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), }; 104 105 @BeforeClass 106 public static void setUp() throws Exception { 107 CONF = TEST_UTIL.getConfiguration(); 108 cacheConf = new CacheConfig(CONF); 109 fs = TEST_UTIL.getTestFileSystem(); 110 TableName tableName = TableName.valueOf("test"); 111 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 112 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build(); 113 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 114 Path path = TEST_UTIL.getDataTestDir("test"); 115 region = HBaseTestingUtil.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), 116 tableDescriptor); 117 } 118 119 @Test 120 public void testScannerCloseAndUpdateReadersWithMemstoreScanner() throws Exception { 121 Put p = new Put(Bytes.toBytes("row")); 122 p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val")); 123 region.put(p); 124 // create the store scanner here. 125 // for easiness, use Long.MAX_VALUE as read pt 126 try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo, 127 new Scan(), getCols("q1"), Long.MAX_VALUE)) { 128 p = new Put(Bytes.toBytes("row1")); 129 p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val")); 130 region.put(p); 131 HStore store = region.getStore(fam); 132 // use the lock to manually get a new memstore scanner. this is what 133 // HStore#notifyChangedReadersObservers does under the lock.(lock is not needed 134 // here 135 // since it is just a testcase). 136 store.getStoreEngine().readLock(); 137 final List<KeyValueScanner> memScanners = store.memstore.getScanners(Long.MAX_VALUE); 138 store.getStoreEngine().readUnlock(); 139 Thread closeThread = new Thread() { 140 public void run() { 141 // close should be completed 142 scan.close(false, true); 143 } 144 }; 145 closeThread.start(); 146 Thread updateThread = new Thread() { 147 public void run() { 148 try { 149 // use the updated memstoreScanners and pass it to updateReaders 150 scan.updateReaders(true, Collections.emptyList(), memScanners); 151 } catch (IOException e) { 152 e.printStackTrace(); 153 } 154 } 155 }; 156 updateThread.start(); 157 // wait for close and updateThread to complete 158 closeThread.join(); 159 updateThread.join(); 160 MemStoreLAB memStoreLAB; 161 for (KeyValueScanner scanner : memScanners) { 162 if (scanner instanceof SegmentScanner) { 163 memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB(); 164 if (memStoreLAB != null) { 165 // There should be no unpooled chunks 166 int refCount = ((MemStoreLABImpl) memStoreLAB).getRefCntValue(); 167 assertTrue("The memstore should not have unpooled chunks", refCount == 0); 168 } 169 } 170 } 171 } 172 } 173 174 @Test 175 public void testScannerCloseAndUpdateReaders1() throws Exception { 176 testScannerCloseAndUpdateReaderInternal(true, false); 177 } 178 179 @Test 180 public void testScannerCloseAndUpdateReaders2() throws Exception { 181 testScannerCloseAndUpdateReaderInternal(false, true); 182 } 183 184 private Path writeStoreFile() throws IOException { 185 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "TestHFile"); 186 HFileContext meta = new HFileContextBuilder().withBlockSize(64 * 1024).build(); 187 StoreFileWriter sfw = new StoreFileWriter.Builder(CONF, fs).withOutputDir(storeFileParentDir) 188 .withFileContext(meta).build(); 189 190 final int rowLen = 32; 191 Random rand = ThreadLocalRandom.current(); 192 for (int i = 0; i < 1000; ++i) { 193 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 194 byte[] v = RandomKeyValueUtil.randomValue(rand); 195 int cfLen = rand.nextInt(k.length - rowLen + 1); 196 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 197 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 198 sfw.append(kv); 199 } 200 201 sfw.close(); 202 return sfw.getPath(); 203 } 204 205 private static KeyValue.Type generateKeyType(Random rand) { 206 if (rand.nextBoolean()) { 207 // Let's make half of KVs puts. 208 return KeyValue.Type.Put; 209 } else { 210 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 211 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 212 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 213 + "Probably the layout of KeyValue.Type has changed."); 214 } 215 return keyType; 216 } 217 } 218 219 private HStoreFile readStoreFile(StoreFileInfo fileinfo) throws Exception { 220 // Open the file reader with block cache disabled. 221 HStoreFile file = new HStoreFile(fileinfo, BloomType.NONE, cacheConf); 222 return file; 223 } 224 225 private void testScannerCloseAndUpdateReaderInternal(boolean awaitUpdate, boolean awaitClose) 226 throws IOException, InterruptedException { 227 // start write to store file. 228 Path path = writeStoreFile(); 229 HStoreFile file = null; 230 List<HStoreFile> files = new ArrayList<HStoreFile>(); 231 try { 232 StoreFileInfo storeFileInfo = StoreFileInfo.createStoreFileInfoForHFile(CONF, fs, path, true); 233 file = readStoreFile(storeFileInfo); 234 files.add(file); 235 } catch (Exception e) { 236 // fail test 237 assertTrue(false); 238 } 239 scanFixture(kvs); 240 // scanners.add(storeFileScanner); 241 try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo, 242 new Scan(), getCols("a", "d"), 100L)) { 243 Thread closeThread = new Thread() { 244 public void run() { 245 scan.close(awaitClose, true); 246 } 247 }; 248 closeThread.start(); 249 Thread updateThread = new Thread() { 250 public void run() { 251 try { 252 scan.updateReaders(awaitUpdate, files, Collections.emptyList()); 253 } catch (IOException e) { 254 e.printStackTrace(); 255 } 256 } 257 }; 258 updateThread.start(); 259 // complete both the threads 260 closeThread.join(); 261 // complete both the threads 262 updateThread.join(); 263 if (file.getReader() != null) { 264 // the fileReader is not null when the updateReaders has completed first. 265 // in the other case the fileReader will be null. 266 int refCount = file.getReader().getRefCount(); 267 LOG.info("the store scanner count is " + refCount); 268 assertTrue("The store scanner count should be 0", refCount == 0); 269 } 270 } 271 } 272 273 private static class ExtendedStoreScanner extends StoreScanner { 274 private CountDownLatch latch = new CountDownLatch(1); 275 276 public ExtendedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, 277 NavigableSet<byte[]> columns, long readPt) throws IOException { 278 super(store, scanInfo, scan, columns, readPt); 279 } 280 281 public void updateReaders(boolean await, List<HStoreFile> sfs, 282 List<KeyValueScanner> memStoreScanners) throws IOException { 283 if (await) { 284 try { 285 latch.await(); 286 } catch (InterruptedException e) { 287 // TODO Auto-generated catch block 288 e.printStackTrace(); 289 } 290 } 291 super.updateReaders(sfs, memStoreScanners); 292 if (!await) { 293 latch.countDown(); 294 } 295 } 296 297 // creating a dummy close 298 public void close(boolean await, boolean dummy) { 299 if (await) { 300 try { 301 latch.await(); 302 } catch (InterruptedException e) { 303 // TODO Auto-generated catch block 304 e.printStackTrace(); 305 } 306 } 307 super.close(); 308 if (!await) { 309 latch.countDown(); 310 } 311 } 312 } 313 314 NavigableSet<byte[]> getCols(String... strCols) { 315 NavigableSet<byte[]> cols = new TreeSet<>(Bytes.BYTES_COMPARATOR); 316 for (String col : strCols) { 317 byte[] bytes = Bytes.toBytes(col); 318 cols.add(bytes); 319 } 320 return cols; 321 } 322}