001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
021import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.nio.file.Paths;
028import java.util.ArrayList;
029import java.util.List;
030import java.util.concurrent.atomic.AtomicInteger;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.CellComparatorImpl;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.ExtendedCell;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.KeyValueUtil;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.client.RegionInfoBuilder;
047import org.apache.hadoop.hbase.io.hfile.CacheConfig;
048import org.apache.hadoop.hbase.io.hfile.HFile;
049import org.apache.hadoop.hbase.io.hfile.HFileContext;
050import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
051import org.apache.hadoop.hbase.io.hfile.HFileScanner;
052import org.apache.hadoop.hbase.io.hfile.ReaderContext;
053import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
054import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
055import org.apache.hadoop.hbase.nio.RefCnt;
056import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
057import org.apache.hadoop.hbase.regionserver.StoreContext;
058import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
059import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
060import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
061import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
062import org.apache.hadoop.hbase.testclassification.IOTests;
063import org.apache.hadoop.hbase.testclassification.SmallTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.junit.AfterClass;
066import org.junit.BeforeClass;
067import org.junit.ClassRule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070
071import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
072
073@Category({ IOTests.class, SmallTests.class })
074public class TestHalfStoreFileReader {
075
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078    HBaseClassTestRule.forClass(TestHalfStoreFileReader.class);
079
080  private static HBaseTestingUtil TEST_UTIL;
081
082  @BeforeClass
083  public static void setupBeforeClass() throws Exception {
084    TEST_UTIL = new HBaseTestingUtil();
085  }
086
087  @AfterClass
088  public static void tearDownAfterClass() throws Exception {
089    TEST_UTIL.cleanupTestDir();
090  }
091
092  /**
093   * Test the scanner and reseek of a half hfile scanner. The scanner API demands that seekTo and
094   * reseekTo() only return < 0 if the key lies before the start of the file (with no position on
095   * the scanner). Returning 0 if perfect match (rare), and return > 1 if we got an imperfect match.
096   * The latter case being the most common, we should generally be returning 1, and if we do, there
097   * may or may not be a 'next' in the scanner/file. A bug in the half file scanner was returning -1
098   * at the end of the bottom half, and that was causing the infrastructure above to go null causing
099   * NPEs and other problems. This test reproduces that failure, and also tests both the bottom and
100   * top of the file while we are at it.
101   */
102  @Test
103  public void testHalfScanAndReseek() throws Exception {
104    ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
105    Configuration conf = TEST_UTIL.getConfiguration();
106    FileSystem fs = FileSystem.get(conf);
107    String root_dir = TEST_UTIL.getDataTestDir().toString();
108    Path parentPath = new Path(new Path(root_dir, "parent"), "CF");
109    fs.mkdirs(parentPath);
110    String tableName = Paths.get(root_dir).getFileName().toString();
111    RegionInfo splitAHri = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
112    Thread.currentThread().sleep(1000);
113    RegionInfo splitBHri = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
114    Path splitAPath = new Path(new Path(root_dir, splitAHri.getRegionNameAsString()), "CF");
115    Path splitBPath = new Path(new Path(root_dir, splitBHri.getRegionNameAsString()), "CF");
116    Path filePath = StoreFileWriter.getUniqueFile(fs, parentPath);
117    String ioEngineName = "file:" + TEST_UTIL.getDataTestDir() + "/bucketNoRecycler.cache";
118    BucketCache bucketCache = new BucketCache(ioEngineName, 32 * 1024 * 1024, 1024,
119      new int[] { 4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024 }, 1, 1, null);
120    conf.setBoolean(CACHE_BLOCKS_ON_WRITE_KEY, true);
121    conf.setInt(BUFFER_SIZE_KEY, 1024);
122    ByteBuffAllocator allocator = ByteBuffAllocator.create(conf, true);
123
124    final AtomicInteger counter = new AtomicInteger();
125    RefCnt.detector.setLeakListener(new ResourceLeakDetector.LeakListener() {
126      @Override
127      public void onLeak(String s, String s1) {
128        counter.incrementAndGet();
129      }
130    });
131
132    ColumnFamilyDescriptorBuilder cfBuilder =
133      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("CF"));
134    CacheConfig cacheConf = new CacheConfig(conf, cfBuilder.build(), bucketCache, allocator);
135
136    HFileContext meta = new HFileContextBuilder().withBlockSize(1024).build();
137    HFile.Writer w =
138      HFile.getWriterFactory(conf, cacheConf).withPath(fs, filePath).withFileContext(meta).create();
139
140    // write some things.
141    List<KeyValue> items = genSomeKeys();
142    for (KeyValue kv : items) {
143      w.append(kv);
144    }
145    w.close();
146
147    HFile.Reader r = HFile.createReader(fs, filePath, cacheConf, true, conf);
148    Cell midKV = r.midKey().get();
149    byte[] midkey = CellUtil.cloneRow(midKV);
150
151    Path splitFileA = new Path(splitAPath, filePath.getName() + ".parent");
152    Path splitFileB = new Path(splitBPath, filePath.getName() + ".parent");
153
154    HRegionFileSystem splitAregionFS =
155      HRegionFileSystem.create(conf, fs, new Path(root_dir), splitAHri);
156    StoreContext splitAStoreContext =
157      StoreContext.getBuilder().withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("CF"))
158        .withFamilyStoreDirectoryPath(splitAPath).withRegionFileSystem(splitAregionFS).build();
159    StoreFileTracker splitAsft = StoreFileTrackerFactory.create(conf, false, splitAStoreContext);
160    Reference bottom = new Reference(midkey, Reference.Range.bottom);
161    splitAsft.createReference(bottom, splitFileA);
162    doTestOfScanAndReseek(splitFileA, fs, bottom, cacheConf);
163
164    HRegionFileSystem splitBregionFS =
165      HRegionFileSystem.create(conf, fs, new Path(root_dir), splitBHri);
166    StoreContext splitBStoreContext =
167      StoreContext.getBuilder().withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("CF"))
168        .withFamilyStoreDirectoryPath(splitBPath).withRegionFileSystem(splitBregionFS).build();
169    StoreFileTracker splitBsft = StoreFileTrackerFactory.create(conf, false, splitBStoreContext);
170    Reference top = new Reference(midkey, Reference.Range.top);
171    splitBsft.createReference(top, splitFileB);
172    doTestOfScanAndReseek(splitFileB, fs, top, cacheConf);
173
174    r.close();
175
176    assertEquals(0, counter.get());
177  }
178
179  private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom, CacheConfig cacheConf)
180    throws Exception {
181    Path referencePath = StoreFileInfo.getReferredToFile(p);
182    FSDataInputStreamWrapper in = new FSDataInputStreamWrapper(fs, referencePath, false, 0);
183    FileStatus status = fs.getFileStatus(referencePath);
184    long length = status.getLen();
185    ReaderContextBuilder contextBuilder =
186      new ReaderContextBuilder().withInputStreamWrapper(in).withFileSize(length)
187        .withReaderType(ReaderContext.ReaderType.PREAD).withFileSystem(fs).withFilePath(p);
188    ReaderContext context = contextBuilder.build();
189    StoreFileInfo storeFileInfo =
190      new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, fs.getFileStatus(p), bottom);
191    storeFileInfo.initHFileInfo(context);
192    final HalfStoreFileReader halfreader =
193      (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);
194    storeFileInfo.getHFileInfo().initMetaAndIndex(halfreader.getHFileReader());
195    halfreader.loadFileInfo();
196    try (HFileScanner scanner = halfreader.getScanner(false, false, false)) {
197
198      scanner.seekTo();
199      Cell curr;
200      do {
201        curr = scanner.getCell();
202        KeyValue reseekKv = getLastOnCol(curr);
203        int ret = scanner.reseekTo(reseekKv);
204        assertTrue("reseek to returned: " + ret, ret > 0);
205        // System.out.println(curr + ": " + ret);
206      } while (scanner.next());
207
208      int ret = scanner.reseekTo(getLastOnCol(curr));
209      // System.out.println("Last reseek: " + ret);
210      assertTrue(ret > 0);
211    }
212
213    halfreader.close(true);
214
215    System.gc();
216    Thread.sleep(1000);
217  }
218
219  // Tests the scanner on an HFile that is backed by HalfStoreFiles
220  @Test
221  public void testHalfScanner() throws IOException {
222    String root_dir = TEST_UTIL.getDataTestDir().toString();
223    Path p = new Path(root_dir, "test");
224    Configuration conf = TEST_UTIL.getConfiguration();
225    FileSystem fs = FileSystem.get(conf);
226    CacheConfig cacheConf = new CacheConfig(conf);
227    HFileContext meta = new HFileContextBuilder().withBlockSize(1024).build();
228    HFile.Writer w =
229      HFile.getWriterFactory(conf, cacheConf).withPath(fs, p).withFileContext(meta).create();
230
231    // write some things.
232    List<KeyValue> items = genSomeKeys();
233    for (KeyValue kv : items) {
234      w.append(kv);
235    }
236    w.close();
237
238    HFile.Reader r = HFile.createReader(fs, p, cacheConf, true, conf);
239    ExtendedCell midKV = r.midKey().get();
240    byte[] midkey = CellUtil.cloneRow(midKV);
241
242    Reference bottom = new Reference(midkey, Reference.Range.bottom);
243    Reference top = new Reference(midkey, Reference.Range.top);
244
245    // Ugly code to get the item before the midkey
246    KeyValue beforeMidKey = null;
247    for (KeyValue item : items) {
248      if (CellComparatorImpl.COMPARATOR.compare(item, midKV) >= 0) {
249        break;
250      }
251      beforeMidKey = item;
252    }
253    System.out.println("midkey: " + midKV + " or: " + Bytes.toStringBinary(midkey));
254    System.out.println("beforeMidKey: " + beforeMidKey);
255
256    // Seek on the splitKey, should be in top, not in bottom
257    Cell foundKeyValue = doTestOfSeekBefore(p, fs, bottom, midKV, cacheConf);
258    assertEquals(beforeMidKey, foundKeyValue);
259
260    // Seek tot the last thing should be the penultimate on the top, the one before the midkey on
261    // the bottom.
262    foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(items.size() - 1), cacheConf);
263    assertEquals(items.get(items.size() - 2), foundKeyValue);
264
265    foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(items.size() - 1), cacheConf);
266    assertEquals(beforeMidKey, foundKeyValue);
267
268    // Try and seek before something that is in the bottom.
269    foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(0), cacheConf);
270    assertNull(foundKeyValue);
271
272    // Try and seek before the first thing.
273    foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(0), cacheConf);
274    assertNull(foundKeyValue);
275
276    // Try and seek before the second thing in the top and bottom.
277    foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(1), cacheConf);
278    assertNull(foundKeyValue);
279
280    foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(1), cacheConf);
281    assertEquals(items.get(0), foundKeyValue);
282
283    // Try to seek before the splitKey in the top file
284    foundKeyValue = doTestOfSeekBefore(p, fs, top, midKV, cacheConf);
285    assertNull(foundKeyValue);
286  }
287
288  private Cell doTestOfSeekBefore(Path p, FileSystem fs, Reference bottom, ExtendedCell seekBefore,
289    CacheConfig cacheConfig) throws IOException {
290    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, p).build();
291    StoreFileInfo storeFileInfo =
292      new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, fs.getFileStatus(p), bottom);
293    storeFileInfo.initHFileInfo(context);
294    final HalfStoreFileReader halfreader =
295      (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConfig);
296    storeFileInfo.getHFileInfo().initMetaAndIndex(halfreader.getHFileReader());
297    halfreader.loadFileInfo();
298    try (HFileScanner scanner = halfreader.getScanner(false, false, false)) {
299      scanner.seekBefore(seekBefore);
300      if (scanner.getCell() != null) {
301        return KeyValueUtil.copyToNewKeyValue(scanner.getCell());
302      } else {
303        return null;
304      }
305    }
306  }
307
308  private KeyValue getLastOnCol(Cell curr) {
309    return KeyValueUtil.createLastOnRow(curr.getRowArray(), curr.getRowOffset(),
310      curr.getRowLength(), curr.getFamilyArray(), curr.getFamilyOffset(), curr.getFamilyLength(),
311      curr.getQualifierArray(), curr.getQualifierOffset(), curr.getQualifierLength());
312  }
313
314  static final int SIZE = 1000;
315
316  static byte[] _b(String s) {
317    return Bytes.toBytes(s);
318  }
319
320  List<KeyValue> genSomeKeys() {
321    List<KeyValue> ret = new ArrayList<>(SIZE);
322    for (int i = 0; i < SIZE; i++) {
323      KeyValue kv =
324        new KeyValue(_b(String.format("row_%04d", i)), _b("family"), _b("qualifier"), 1000, // timestamp
325          _b("value"));
326      ret.add(kv);
327    }
328    return ret;
329  }
330}