001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.KeyValueTestUtil.create;
021import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.NavigableSet;
029import java.util.Random;
030import java.util.TreeSet;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.ThreadLocalRandom;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.CellComparator;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.HColumnDescriptor;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HRegionInfo;
043import org.apache.hadoop.hbase.HTableDescriptor;
044import org.apache.hadoop.hbase.KeepDeletedCells;
045import org.apache.hadoop.hbase.KeyValue;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.Put;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.io.hfile.CacheConfig;
050import org.apache.hadoop.hbase.io.hfile.HFileContext;
051import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
052import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
053import org.apache.hadoop.hbase.testclassification.RegionServerTests;
054import org.apache.hadoop.hbase.testclassification.SmallTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Rule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.rules.TestName;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * This test tests whether parallel {@link StoreScanner#close()} and
067 * {@link StoreScanner#updateReaders(List, List)} works perfectly ensuring that there are no
068 * references on the existing Storescanner readers.
069 */
070@Category({ RegionServerTests.class, SmallTests.class })
071public class TestStoreScannerClosure {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075    HBaseClassTestRule.forClass(TestStoreScannerClosure.class);
076
077  private static final Logger LOG = LoggerFactory.getLogger(TestStoreScannerClosure.class);
078  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
079  @Rule
080  public TestName name = new TestName();
081  private static final String CF_STR = "cf";
082  private static HRegion region;
083  private static final byte[] CF = Bytes.toBytes(CF_STR);
084  static Configuration CONF = HBaseConfiguration.create();
085  private static CacheConfig cacheConf;
086  private static FileSystem fs;
087  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
088  private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestHFile").toString();
089  private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
090    KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false);
091  private final static byte[] fam = Bytes.toBytes("cf_1");
092  private static final KeyValue[] kvs =
093    new KeyValue[] { create("R1", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
094      create("R1", "cf", "b", 11, KeyValue.Type.Put, "dont-care"),
095      create("R1", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
096      create("R1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"),
097      create("R1", "cf", "e", 11, KeyValue.Type.Put, "dont-care"),
098      create("R1", "cf", "f", 11, KeyValue.Type.Put, "dont-care"),
099      create("R1", "cf", "g", 11, KeyValue.Type.Put, "dont-care"),
100      create("R1", "cf", "h", 11, KeyValue.Type.Put, "dont-care"),
101      create("R1", "cf", "i", 11, KeyValue.Type.Put, "dont-care"),
102      create("R2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), };
103
104  @BeforeClass
105  public static void setUp() throws Exception {
106    CONF = TEST_UTIL.getConfiguration();
107    cacheConf = new CacheConfig(CONF);
108    fs = TEST_UTIL.getTestFileSystem();
109    TableName tableName = TableName.valueOf("test");
110    HTableDescriptor htd = new HTableDescriptor(tableName);
111    htd.addFamily(new HColumnDescriptor(fam));
112    HRegionInfo info = new HRegionInfo(tableName, null, null, false);
113    Path path = TEST_UTIL.getDataTestDir("test");
114    region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd);
115  }
116
117  @Test
118  public void testScannerCloseAndUpdateReadersWithMemstoreScanner() throws Exception {
119    Put p = new Put(Bytes.toBytes("row"));
120    p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val"));
121    region.put(p);
122    // create the store scanner here.
123    // for easiness, use Long.MAX_VALUE as read pt
124    try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo,
125      new Scan(), getCols("q1"), Long.MAX_VALUE)) {
126      p = new Put(Bytes.toBytes("row1"));
127      p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val"));
128      region.put(p);
129      HStore store = region.getStore(fam);
130      // use the lock to manually get a new memstore scanner. this is what
131      // HStore#notifyChangedReadersObservers does under the lock.(lock is not needed here
132      // since it is just a testcase).
133      store.getStoreEngine().readLock();
134      final List<KeyValueScanner> memScanners = store.memstore.getScanners(Long.MAX_VALUE);
135      store.getStoreEngine().readUnlock();
136      Thread closeThread = new Thread() {
137        public void run() {
138          // close should be completed
139          scan.close(false, true);
140        }
141      };
142      closeThread.start();
143      Thread updateThread = new Thread() {
144        public void run() {
145          try {
146            // use the updated memstoreScanners and pass it to updateReaders
147            scan.updateReaders(true, Collections.emptyList(), memScanners);
148          } catch (IOException e) {
149            e.printStackTrace();
150          }
151        }
152      };
153      updateThread.start();
154      // wait for close and updateThread to complete
155      closeThread.join();
156      updateThread.join();
157      MemStoreLAB memStoreLAB;
158      for (KeyValueScanner scanner : memScanners) {
159        if (scanner instanceof SegmentScanner) {
160          memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB();
161          if (memStoreLAB != null) {
162            // There should be no unpooled chunks
163            int refCount = ((MemStoreLABImpl) memStoreLAB).getRefCntValue();
164            assertTrue("The memstore should not have unpooled chunks", refCount == 0);
165          }
166        }
167      }
168    }
169  }
170
171  @Test
172  public void testScannerCloseAndUpdateReaders1() throws Exception {
173    testScannerCloseAndUpdateReaderInternal(true, false);
174  }
175
176  @Test
177  public void testScannerCloseAndUpdateReaders2() throws Exception {
178    testScannerCloseAndUpdateReaderInternal(false, true);
179  }
180
181  private Path writeStoreFile() throws IOException {
182    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "TestHFile");
183    HFileContext meta = new HFileContextBuilder().withBlockSize(64 * 1024).build();
184    StoreFileWriter sfw = new StoreFileWriter.Builder(CONF, fs).withOutputDir(storeFileParentDir)
185      .withFileContext(meta).build();
186
187    final int rowLen = 32;
188    Random rand = ThreadLocalRandom.current();
189    for (int i = 0; i < 1000; ++i) {
190      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
191      byte[] v = RandomKeyValueUtil.randomValue(rand);
192      int cfLen = rand.nextInt(k.length - rowLen + 1);
193      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
194        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
195      sfw.append(kv);
196    }
197
198    sfw.close();
199    return sfw.getPath();
200  }
201
202  private static KeyValue.Type generateKeyType(Random rand) {
203    if (rand.nextBoolean()) {
204      // Let's make half of KVs puts.
205      return KeyValue.Type.Put;
206    } else {
207      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
208      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
209        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
210          + "Probably the layout of KeyValue.Type has changed.");
211      }
212      return keyType;
213    }
214  }
215
216  private HStoreFile readStoreFile(Path storeFilePath, Configuration conf) throws Exception {
217    // Open the file reader with block cache disabled.
218    HStoreFile file = new HStoreFile(fs, storeFilePath, conf, cacheConf, BloomType.NONE, true);
219    return file;
220  }
221
222  private void testScannerCloseAndUpdateReaderInternal(boolean awaitUpdate, boolean awaitClose)
223    throws IOException, InterruptedException {
224    // start write to store file.
225    Path path = writeStoreFile();
226    HStoreFile file = null;
227    List<HStoreFile> files = new ArrayList<HStoreFile>();
228    try {
229      file = readStoreFile(path, CONF);
230      files.add(file);
231    } catch (Exception e) {
232      // fail test
233      assertTrue(false);
234    }
235    scanFixture(kvs);
236    // scanners.add(storeFileScanner);
237    try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo,
238      new Scan(), getCols("a", "d"), 100L)) {
239      Thread closeThread = new Thread() {
240        public void run() {
241          scan.close(awaitClose, true);
242        }
243      };
244      closeThread.start();
245      Thread updateThread = new Thread() {
246        public void run() {
247          try {
248            scan.updateReaders(awaitUpdate, files, Collections.emptyList());
249          } catch (IOException e) {
250            e.printStackTrace();
251          }
252        }
253      };
254      updateThread.start();
255      // complete both the threads
256      closeThread.join();
257      // complete both the threads
258      updateThread.join();
259      if (file.getReader() != null) {
260        // the fileReader is not null when the updateReaders has completed first.
261        // in the other case the fileReader will be null.
262        int refCount = file.getReader().getRefCount();
263        LOG.info("the store scanner count is " + refCount);
264        assertTrue("The store scanner count should be 0", refCount == 0);
265      }
266    }
267  }
268
269  private static class ExtendedStoreScanner extends StoreScanner {
270    private CountDownLatch latch = new CountDownLatch(1);
271
272    public ExtendedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan,
273      NavigableSet<byte[]> columns, long readPt) throws IOException {
274      super(store, scanInfo, scan, columns, readPt);
275    }
276
277    public void updateReaders(boolean await, List<HStoreFile> sfs,
278      List<KeyValueScanner> memStoreScanners) throws IOException {
279      if (await) {
280        try {
281          latch.await();
282        } catch (InterruptedException e) {
283          // TODO Auto-generated catch block
284          e.printStackTrace();
285        }
286      }
287      super.updateReaders(sfs, memStoreScanners);
288      if (!await) {
289        latch.countDown();
290      }
291    }
292
293    // creating a dummy close
294    public void close(boolean await, boolean dummy) {
295      if (await) {
296        try {
297          latch.await();
298        } catch (InterruptedException e) {
299          // TODO Auto-generated catch block
300          e.printStackTrace();
301        }
302      }
303      super.close();
304      if (!await) {
305        latch.countDown();
306      }
307    }
308  }
309
310  NavigableSet<byte[]> getCols(String... strCols) {
311    NavigableSet<byte[]> cols = new TreeSet<>(Bytes.BYTES_COMPARATOR);
312    for (String col : strCols) {
313      byte[] bytes = Bytes.toBytes(col);
314      cols.add(bytes);
315    }
316    return cols;
317  }
318}