001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.NavigableSet;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellComparatorImpl;
034import org.apache.hadoop.hbase.CompareOperator;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.KeepDeletedCells;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.KeyValueUtil;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.client.TableDescriptor;
047import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
048import org.apache.hadoop.hbase.filter.Filter;
049import org.apache.hadoop.hbase.filter.FilterList;
050import org.apache.hadoop.hbase.filter.FilterList.Operator;
051import org.apache.hadoop.hbase.filter.PageFilter;
052import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
053import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
054import org.apache.hadoop.hbase.io.hfile.CacheConfig;
055import org.apache.hadoop.hbase.io.hfile.HFileContext;
056import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
057import org.apache.hadoop.hbase.testclassification.MediumTests;
058import org.apache.hadoop.hbase.testclassification.RegionServerTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
061import org.apache.hadoop.hbase.util.Pair;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Rule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.junit.rules.TestName;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
072
073/**
074 * Test cases against ReversibleKeyValueScanner
075 */
076@Category({ RegionServerTests.class, MediumTests.class })
077public class TestReversibleScanners {
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081    HBaseClassTestRule.forClass(TestReversibleScanners.class);
082
083  private static final Logger LOG = LoggerFactory.getLogger(TestReversibleScanners.class);
084  HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
085
086  private static byte[] FAMILYNAME = Bytes.toBytes("testCf");
087  private static long TS = EnvironmentEdgeManager.currentTime();
088  private static int MAXMVCC = 7;
089  private static byte[] ROW = Bytes.toBytes("testRow");
090  private static final int ROWSIZE = 200;
091  private static byte[][] ROWS = makeN(ROW, ROWSIZE);
092  private static byte[] QUAL = Bytes.toBytes("testQual");
093  private static final int QUALSIZE = 5;
094  private static byte[][] QUALS = makeN(QUAL, QUALSIZE);
095  private static byte[] VALUE = Bytes.toBytes("testValue");
096  private static final int VALUESIZE = 3;
097  private static byte[][] VALUES = makeN(VALUE, VALUESIZE);
098
099  @Rule
100  public TestName name = new TestName();
101
102  @BeforeClass
103  public static void setUp() {
104    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
105      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
106  }
107
108  @Test
109  public void testReversibleStoreFileScanner() throws IOException {
110    FileSystem fs = TEST_UTIL.getTestFileSystem();
111    Path hfilePath =
112      new Path(new Path(TEST_UTIL.getDataTestDir("testReversibleStoreFileScanner"), "regionname"),
113        "familyname");
114    CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
115    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
116      HFileContextBuilder hcBuilder = new HFileContextBuilder();
117      hcBuilder.withBlockSize(2 * 1024);
118      hcBuilder.withDataBlockEncoding(encoding);
119      HFileContext hFileContext = hcBuilder.build();
120      StoreFileWriter writer =
121        new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
122          .withOutputDir(hfilePath).withFileContext(hFileContext).build();
123      writeStoreFile(writer);
124
125      StoreFileInfo storeFileInfo = StoreFileInfo
126        .createStoreFileInfoForHFile(TEST_UTIL.getConfiguration(), fs, writer.getPath(), true);
127      HStoreFile sf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf);
128
129      List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
130        Collections.singletonList(sf), false, true, false, false, Long.MAX_VALUE);
131      StoreFileScanner scanner = scanners.get(0);
132      seekTestOfReversibleKeyValueScanner(scanner);
133      for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
134        LOG.info("Setting read point to " + readPoint);
135        scanners = StoreFileScanner.getScannersForStoreFiles(Collections.singletonList(sf), false,
136          true, false, false, readPoint);
137        seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint);
138      }
139    }
140
141  }
142
143  @Test
144  public void testReversibleMemstoreScanner() throws IOException {
145    MemStore memstore = new DefaultMemStore();
146    writeMemstore(memstore);
147    List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
148    seekTestOfReversibleKeyValueScanner(scanners.get(0));
149    for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
150      LOG.info("Setting read point to " + readPoint);
151      scanners = memstore.getScanners(readPoint);
152      seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint);
153    }
154
155  }
156
157  @Test
158  public void testReversibleKeyValueHeap() throws IOException {
159    // write data to one memstore and two store files
160    FileSystem fs = TEST_UTIL.getTestFileSystem();
161    Path hfilePath = new Path(
162      new Path(TEST_UTIL.getDataTestDir("testReversibleKeyValueHeap"), "regionname"), "familyname");
163    CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
164    HFileContextBuilder hcBuilder = new HFileContextBuilder();
165    hcBuilder.withBlockSize(2 * 1024);
166    HFileContext hFileContext = hcBuilder.build();
167    StoreFileWriter writer1 =
168      new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
169        .withOutputDir(hfilePath).withFileContext(hFileContext).build();
170    StoreFileWriter writer2 =
171      new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
172        .withOutputDir(hfilePath).withFileContext(hFileContext).build();
173
174    MemStore memstore = new DefaultMemStore();
175    writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1, writer2 });
176
177    StoreFileInfo storeFileInfo1 = StoreFileInfo
178      .createStoreFileInfoForHFile(TEST_UTIL.getConfiguration(), fs, writer1.getPath(), true);
179    HStoreFile sf1 = new HStoreFile(storeFileInfo1, BloomType.NONE, cacheConf);
180
181    StoreFileInfo storeFileInfo2 = StoreFileInfo
182      .createStoreFileInfoForHFile(TEST_UTIL.getConfiguration(), fs, writer2.getPath(), true);
183    HStoreFile sf2 = new HStoreFile(storeFileInfo2, BloomType.NONE, cacheConf);
184    /**
185     * Test without MVCC
186     */
187    int startRowNum = ROWSIZE / 2;
188    ReversedKeyValueHeap kvHeap =
189      getReversibleKeyValueHeap(memstore, sf1, sf2, ROWS[startRowNum], MAXMVCC);
190    internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
191
192    startRowNum = ROWSIZE - 1;
193    kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, HConstants.EMPTY_START_ROW, MAXMVCC);
194    internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
195
196    /**
197     * Test with MVCC
198     */
199    for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
200      LOG.info("Setting read point to " + readPoint);
201      startRowNum = ROWSIZE - 1;
202      kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, HConstants.EMPTY_START_ROW, readPoint);
203      for (int i = startRowNum; i >= 0; i--) {
204        if (i - 2 < 0) break;
205        i = i - 2;
206        kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
207        Pair<Integer, Integer> nextReadableNum =
208          getNextReadableNumWithBackwardScan(i, 0, readPoint);
209        if (nextReadableNum == null) break;
210        KeyValue expecedKey = makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
211        assertEquals(expecedKey, kvHeap.peek());
212        i = nextReadableNum.getFirst();
213        int qualNum = nextReadableNum.getSecond();
214        if (qualNum + 1 < QUALSIZE) {
215          kvHeap.backwardSeek(makeKV(i, qualNum + 1));
216          nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, readPoint);
217          if (nextReadableNum == null) break;
218          expecedKey = makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
219          assertEquals(expecedKey, kvHeap.peek());
220          i = nextReadableNum.getFirst();
221          qualNum = nextReadableNum.getSecond();
222        }
223
224        kvHeap.next();
225
226        if (qualNum + 1 >= QUALSIZE) {
227          nextReadableNum = getNextReadableNumWithBackwardScan(i - 1, 0, readPoint);
228        } else {
229          nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, readPoint);
230        }
231        if (nextReadableNum == null) break;
232        expecedKey = makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
233        assertEquals(expecedKey, kvHeap.peek());
234        i = nextReadableNum.getFirst();
235      }
236    }
237  }
238
239  @Test
240  public void testReversibleStoreScanner() throws IOException {
241    // write data to one memstore and two store files
242    FileSystem fs = TEST_UTIL.getTestFileSystem();
243    Path hfilePath = new Path(
244      new Path(TEST_UTIL.getDataTestDir("testReversibleStoreScanner"), "regionname"), "familyname");
245    CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
246    HFileContextBuilder hcBuilder = new HFileContextBuilder();
247    hcBuilder.withBlockSize(2 * 1024);
248    HFileContext hFileContext = hcBuilder.build();
249    StoreFileWriter writer1 =
250      new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
251        .withOutputDir(hfilePath).withFileContext(hFileContext).build();
252    StoreFileWriter writer2 =
253      new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
254        .withOutputDir(hfilePath).withFileContext(hFileContext).build();
255
256    MemStore memstore = new DefaultMemStore();
257    writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1, writer2 });
258
259    StoreFileInfo storeFileInfo1 = StoreFileInfo
260      .createStoreFileInfoForHFile(TEST_UTIL.getConfiguration(), fs, writer1.getPath(), true);
261    HStoreFile sf1 = new HStoreFile(storeFileInfo1, BloomType.NONE, cacheConf);
262
263    StoreFileInfo storeFileInfo2 = StoreFileInfo
264      .createStoreFileInfoForHFile(TEST_UTIL.getConfiguration(), fs, writer2.getPath(), true);
265    HStoreFile sf2 = new HStoreFile(storeFileInfo2, BloomType.NONE, cacheConf);
266
267    ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE,
268      Long.MAX_VALUE, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0,
269      CellComparatorImpl.COMPARATOR, false);
270
271    // Case 1.Test a full reversed scan
272    Scan scan = new Scan();
273    scan.setReversed(true);
274    StoreScanner storeScanner =
275      getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
276    verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false);
277
278    // Case 2.Test reversed scan with a specified start row
279    int startRowNum = ROWSIZE / 2;
280    byte[] startRow = ROWS[startRowNum];
281    scan.withStartRow(startRow);
282    storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
283    verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1), startRowNum + 1, false);
284
285    // Case 3.Test reversed scan with a specified start row and specified
286    // qualifiers
287    assertTrue(QUALSIZE > 2);
288    scan.addColumn(FAMILYNAME, QUALS[0]);
289    scan.addColumn(FAMILYNAME, QUALS[2]);
290    storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
291    verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1, false);
292
293    // Case 4.Test reversed scan with mvcc based on case 3
294    for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
295      LOG.info("Setting read point to " + readPoint);
296      storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, readPoint);
297      int expectedRowCount = 0;
298      int expectedKVCount = 0;
299      for (int i = startRowNum; i >= 0; i--) {
300        int kvCount = 0;
301        if (makeMVCC(i, 0) <= readPoint) {
302          kvCount++;
303        }
304        if (makeMVCC(i, 2) <= readPoint) {
305          kvCount++;
306        }
307        if (kvCount > 0) {
308          expectedRowCount++;
309          expectedKVCount += kvCount;
310        }
311      }
312      verifyCountAndOrder(storeScanner, expectedKVCount, expectedRowCount, false);
313    }
314  }
315
316  @Test
317  public void testReversibleRegionScanner() throws IOException {
318    byte[] FAMILYNAME2 = Bytes.toBytes("testCf2");
319    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
320      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYNAME))
321      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYNAME2)).build();
322    HRegion region = TEST_UTIL.createLocalHRegion(htd, null, null);
323    loadDataToRegion(region, FAMILYNAME2);
324
325    // verify row count with forward scan
326    Scan scan = new Scan();
327    InternalScanner scanner = region.getScanner(scan);
328    verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, true);
329
330    // Case1:Full reversed scan
331    scan.setReversed(true);
332    scanner = region.getScanner(scan);
333    verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, false);
334
335    // Case2:Full reversed scan with one family
336    scan = new Scan();
337    scan.setReversed(true);
338    scan.addFamily(FAMILYNAME);
339    scanner = region.getScanner(scan);
340    verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE, ROWSIZE, false);
341
342    // Case3:Specify qualifiers + One family
343    byte[][] specifiedQualifiers = { QUALS[1], QUALS[2] };
344    for (byte[] specifiedQualifier : specifiedQualifiers)
345      scan.addColumn(FAMILYNAME, specifiedQualifier);
346    scanner = region.getScanner(scan);
347    verifyCountAndOrder(scanner, ROWSIZE * 2, ROWSIZE, false);
348
349    // Case4:Specify qualifiers + Two families
350    for (byte[] specifiedQualifier : specifiedQualifiers)
351      scan.addColumn(FAMILYNAME2, specifiedQualifier);
352    scanner = region.getScanner(scan);
353    verifyCountAndOrder(scanner, ROWSIZE * 2 * 2, ROWSIZE, false);
354
355    // Case5: Case4 + specify start row
356    int startRowNum = ROWSIZE * 3 / 4;
357    scan.withStartRow(ROWS[startRowNum]);
358    scanner = region.getScanner(scan);
359    verifyCountAndOrder(scanner, (startRowNum + 1) * 2 * 2, (startRowNum + 1), false);
360
361    // Case6: Case4 + specify stop row
362    int stopRowNum = ROWSIZE / 4;
363    scan.withStartRow(HConstants.EMPTY_BYTE_ARRAY);
364    scan.withStopRow(ROWS[stopRowNum]);
365    scanner = region.getScanner(scan);
366    verifyCountAndOrder(scanner, (ROWSIZE - stopRowNum - 1) * 2 * 2, (ROWSIZE - stopRowNum - 1),
367      false);
368
369    // Case7: Case4 + specify start row + specify stop row
370    scan.withStartRow(ROWS[startRowNum]);
371    scanner = region.getScanner(scan);
372    verifyCountAndOrder(scanner, (startRowNum - stopRowNum) * 2 * 2, (startRowNum - stopRowNum),
373      false);
374
375    // Case8: Case7 + SingleColumnValueFilter
376    int valueNum = startRowNum % VALUESIZE;
377    Filter filter = new SingleColumnValueFilter(FAMILYNAME, specifiedQualifiers[0],
378      CompareOperator.EQUAL, VALUES[valueNum]);
379    scan.setFilter(filter);
380    scanner = region.getScanner(scan);
381    int unfilteredRowNum =
382      (startRowNum - stopRowNum) / VALUESIZE + (stopRowNum / VALUESIZE == valueNum ? 0 : 1);
383    verifyCountAndOrder(scanner, unfilteredRowNum * 2 * 2, unfilteredRowNum, false);
384
385    // Case9: Case7 + PageFilter
386    int pageSize = 10;
387    filter = new PageFilter(pageSize);
388    scan.setFilter(filter);
389    scanner = region.getScanner(scan);
390    int expectedRowNum = pageSize;
391    verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
392
393    // Case10: Case7 + FilterList+MUST_PASS_ONE
394    SingleColumnValueFilter scvFilter1 = new SingleColumnValueFilter(FAMILYNAME,
395      specifiedQualifiers[0], CompareOperator.EQUAL, VALUES[0]);
396    SingleColumnValueFilter scvFilter2 = new SingleColumnValueFilter(FAMILYNAME,
397      specifiedQualifiers[0], CompareOperator.EQUAL, VALUES[1]);
398    expectedRowNum = 0;
399    for (int i = startRowNum; i > stopRowNum; i--) {
400      if (i % VALUESIZE == 0 || i % VALUESIZE == 1) {
401        expectedRowNum++;
402      }
403    }
404    filter = new FilterList(Operator.MUST_PASS_ONE, scvFilter1, scvFilter2);
405    scan.setFilter(filter);
406    scanner = region.getScanner(scan);
407    verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
408
409    // Case10: Case7 + FilterList+MUST_PASS_ALL
410    filter = new FilterList(Operator.MUST_PASS_ALL, scvFilter1, scvFilter2);
411    expectedRowNum = 0;
412    scan.setFilter(filter);
413    scanner = region.getScanner(scan);
414    verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
415  }
416
417  private StoreScanner getReversibleStoreScanner(MemStore memstore, HStoreFile sf1, HStoreFile sf2,
418    Scan scan, ScanInfo scanInfo, int readPoint) throws IOException {
419    List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null, false, readPoint);
420    NavigableSet<byte[]> columns = null;
421    for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
422      // Should only one family
423      columns = entry.getValue();
424    }
425    StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo, columns, scanners);
426    return storeScanner;
427  }
428
429  private void verifyCountAndOrder(InternalScanner scanner, int expectedKVCount,
430    int expectedRowCount, boolean forward) throws IOException {
431    List<Cell> kvList = new ArrayList<>();
432    Result lastResult = null;
433    int rowCount = 0;
434    int kvCount = 0;
435    try {
436      while (scanner.next(kvList)) {
437        if (kvList.isEmpty()) continue;
438        rowCount++;
439        kvCount += kvList.size();
440        if (lastResult != null) {
441          Result curResult = Result.create(kvList);
442          assertEquals("LastResult:" + lastResult + "CurResult:" + curResult, forward,
443            Bytes.compareTo(curResult.getRow(), lastResult.getRow()) > 0);
444        }
445        lastResult = Result.create(kvList);
446        kvList.clear();
447      }
448    } finally {
449      scanner.close();
450    }
451    if (!kvList.isEmpty()) {
452      rowCount++;
453      kvCount += kvList.size();
454      kvList.clear();
455    }
456    assertEquals(expectedKVCount, kvCount);
457    assertEquals(expectedRowCount, rowCount);
458  }
459
460  private void internalTestSeekAndNextForReversibleKeyValueHeap(ReversedKeyValueHeap kvHeap,
461    int startRowNum) throws IOException {
462    // Test next and seek
463    for (int i = startRowNum; i >= 0; i--) {
464      if (i % 2 == 1 && i - 2 >= 0) {
465        i = i - 2;
466        kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
467      }
468      for (int j = 0; j < QUALSIZE; j++) {
469        if (j % 2 == 1 && (j + 1) < QUALSIZE) {
470          j = j + 1;
471          kvHeap.backwardSeek(makeKV(i, j));
472        }
473        assertEquals(makeKV(i, j), kvHeap.peek());
474        kvHeap.next();
475      }
476    }
477    assertEquals(null, kvHeap.peek());
478  }
479
480  private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore, HStoreFile sf1,
481    HStoreFile sf2, byte[] startRow, int readPoint) throws IOException {
482    List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, startRow, true, readPoint);
483    ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners, CellComparatorImpl.COMPARATOR);
484    return kvHeap;
485  }
486
487  private List<KeyValueScanner> getScanners(MemStore memstore, HStoreFile sf1, HStoreFile sf2,
488    byte[] startRow, boolean doSeek, int readPoint) throws IOException {
489    List<StoreFileScanner> fileScanners = StoreFileScanner
490      .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true, false, false, readPoint);
491    List<KeyValueScanner> memScanners = memstore.getScanners(readPoint);
492    List<KeyValueScanner> scanners = new ArrayList<>(fileScanners.size() + 1);
493    scanners.addAll(fileScanners);
494    scanners.addAll(memScanners);
495
496    if (doSeek) {
497      if (Bytes.equals(HConstants.EMPTY_START_ROW, startRow)) {
498        for (KeyValueScanner scanner : scanners) {
499          scanner.seekToLastRow();
500        }
501      } else {
502        KeyValue startKey = KeyValueUtil.createFirstOnRow(startRow);
503        for (KeyValueScanner scanner : scanners) {
504          scanner.backwardSeek(startKey);
505        }
506      }
507    }
508    return scanners;
509  }
510
511  private void seekTestOfReversibleKeyValueScanner(KeyValueScanner scanner) throws IOException {
512    /**
513     * Test without MVCC
514     */
515    // Test seek to last row
516    assertTrue(scanner.seekToLastRow());
517    assertEquals(makeKV(ROWSIZE - 1, 0), scanner.peek());
518
519    // Test backward seek in three cases
520    // Case1: seek in the same row in backwardSeek
521    KeyValue seekKey = makeKV(ROWSIZE - 2, QUALSIZE - 2);
522    assertTrue(scanner.backwardSeek(seekKey));
523    assertEquals(seekKey, scanner.peek());
524
525    // Case2: seek to the previous row in backwardSeek
526    int seekRowNum = ROWSIZE - 2;
527    assertTrue(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[seekRowNum])));
528    KeyValue expectedKey = makeKV(seekRowNum - 1, 0);
529    assertEquals(expectedKey, scanner.peek());
530
531    // Case3: unable to backward seek
532    assertFalse(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[0])));
533    assertEquals(null, scanner.peek());
534
535    // Test seek to previous row
536    seekRowNum = ROWSIZE - 4;
537    assertTrue(scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum])));
538    expectedKey = makeKV(seekRowNum - 1, 0);
539    assertEquals(expectedKey, scanner.peek());
540
541    // Test seek to previous row for the first row
542    assertFalse(scanner.seekToPreviousRow(makeKV(0, 0)));
543    assertEquals(null, scanner.peek());
544
545  }
546
547  private void seekTestOfReversibleKeyValueScannerWithMVCC(List<? extends KeyValueScanner> scanners,
548    int readPoint) throws IOException {
549    /**
550     * Test with MVCC
551     */
552    // Test seek to last row
553    KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 1, 0, readPoint);
554    boolean res = false;
555    for (KeyValueScanner scanner : scanners) {
556      res |= scanner.seekToLastRow();
557    }
558    assertEquals(expectedKey != null, res);
559    res = false;
560    for (KeyValueScanner scanner : scanners) {
561      res |= (expectedKey.equals(scanner.peek()));
562    }
563    assertTrue(res);
564
565    // Test backward seek in two cases
566    // Case1: seek in the same row in backwardSeek
567    expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2, QUALSIZE - 2, readPoint);
568    res = false;
569    for (KeyValueScanner scanner : scanners) {
570      res |= scanner.backwardSeek(expectedKey);
571    }
572    assertEquals(expectedKey != null, res);
573    res = false;
574    for (KeyValueScanner scanner : scanners) {
575      res |= (expectedKey.equals(scanner.peek()));
576    }
577    assertTrue(res);
578
579    // Case2: seek to the previous row in backwardSeek
580    int seekRowNum = ROWSIZE - 3;
581    res = false;
582    for (KeyValueScanner scanner : scanners) {
583      res |= scanner.backwardSeek(expectedKey);
584    }
585    res = false;
586    for (KeyValueScanner scanner : scanners) {
587      res |= (expectedKey.equals(scanner.peek()));
588    }
589    assertTrue(res);
590
591    // Test seek to previous row
592    seekRowNum = ROWSIZE - 4;
593    expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, readPoint);
594    res = false;
595    for (KeyValueScanner scanner : scanners) {
596      res |= scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum]));
597    }
598    assertEquals(expectedKey != null, res);
599    res = false;
600    for (KeyValueScanner scanner : scanners) {
601      res |= (expectedKey.equals(scanner.peek()));
602    }
603    assertTrue(res);
604  }
605
606  private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum, int startQualNum,
607    int readPoint) {
608    Pair<Integer, Integer> nextReadableNum =
609      getNextReadableNumWithBackwardScan(startRowNum, startQualNum, readPoint);
610    if (nextReadableNum == null) return null;
611    return makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
612  }
613
614  private Pair<Integer, Integer> getNextReadableNumWithBackwardScan(int startRowNum,
615    int startQualNum, int readPoint) {
616    Pair<Integer, Integer> nextReadableNum = null;
617    boolean findExpected = false;
618    for (int i = startRowNum; i >= 0; i--) {
619      for (int j = (i == startRowNum ? startQualNum : 0); j < QUALSIZE; j++) {
620        if (makeMVCC(i, j) <= readPoint) {
621          nextReadableNum = new Pair<>(i, j);
622          findExpected = true;
623          break;
624        }
625      }
626      if (findExpected) break;
627    }
628    return nextReadableNum;
629  }
630
631  private static void loadDataToRegion(HRegion region, byte[] additionalFamily) throws IOException {
632    for (int i = 0; i < ROWSIZE; i++) {
633      Put put = new Put(ROWS[i]);
634      for (int j = 0; j < QUALSIZE; j++) {
635        put.add(makeKV(i, j));
636        // put additional family
637        put.add(makeKV(i, j, additionalFamily));
638      }
639      region.put(put);
640      if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) {
641        region.flush(true);
642      }
643    }
644  }
645
646  private static void writeMemstoreAndStoreFiles(MemStore memstore, final StoreFileWriter[] writers)
647    throws IOException {
648    try {
649      for (int i = 0; i < ROWSIZE; i++) {
650        for (int j = 0; j < QUALSIZE; j++) {
651          if (i % 2 == 0) {
652            memstore.add(makeKV(i, j), null);
653          } else {
654            writers[(i + j) % writers.length].append(makeKV(i, j));
655          }
656        }
657      }
658    } finally {
659      for (int i = 0; i < writers.length; i++) {
660        writers[i].close();
661      }
662    }
663  }
664
665  private static void writeStoreFile(final StoreFileWriter writer) throws IOException {
666    try {
667      for (int i = 0; i < ROWSIZE; i++) {
668        for (int j = 0; j < QUALSIZE; j++) {
669          writer.append(makeKV(i, j));
670        }
671      }
672    } finally {
673      writer.close();
674    }
675  }
676
677  private static void writeMemstore(MemStore memstore) throws IOException {
678    // Add half of the keyvalues to memstore
679    for (int i = 0; i < ROWSIZE; i++) {
680      for (int j = 0; j < QUALSIZE; j++) {
681        if ((i + j) % 2 == 0) {
682          memstore.add(makeKV(i, j), null);
683        }
684      }
685    }
686    memstore.snapshot();
687    // Add another half of the keyvalues to snapshot
688    for (int i = 0; i < ROWSIZE; i++) {
689      for (int j = 0; j < QUALSIZE; j++) {
690        if ((i + j) % 2 == 1) {
691          memstore.add(makeKV(i, j), null);
692        }
693      }
694    }
695  }
696
697  private static KeyValue makeKV(int rowNum, int cqNum) {
698    return makeKV(rowNum, cqNum, FAMILYNAME);
699  }
700
701  private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) {
702    KeyValue kv =
703      new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS, VALUES[rowNum % VALUESIZE]);
704    kv.setSequenceId(makeMVCC(rowNum, cqNum));
705    return kv;
706  }
707
708  private static long makeMVCC(int rowNum, int cqNum) {
709    return (rowNum + cqNum) % (MAXMVCC + 1);
710  }
711
712  private static byte[][] makeN(byte[] base, int n) {
713    byte[][] ret = new byte[n][];
714    for (int i = 0; i < n; i++) {
715      ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
716    }
717    return ret;
718  }
719}