001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.List;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HColumnDescriptor;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.HRegionInfo;
039import org.apache.hadoop.hbase.HTableDescriptor;
040import org.apache.hadoop.hbase.HTestConst;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.UnknownScannerException;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Get;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.ResultScanner;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.filter.Filter;
051import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
052import org.apache.hadoop.hbase.filter.PrefixFilter;
053import org.apache.hadoop.hbase.filter.WhileMatchFilter;
054import org.apache.hadoop.hbase.testclassification.MediumTests;
055import org.apache.hadoop.hbase.testclassification.RegionServerTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.junit.ClassRule;
059import org.junit.Rule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.junit.rules.TestName;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066/**
067 * Test of a long-lived scanner validating as we go.
068 */
069@Category({ RegionServerTests.class, MediumTests.class })
070public class TestScanner {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074    HBaseClassTestRule.forClass(TestScanner.class);
075
076  @Rule
077  public TestName name = new TestName();
078
079  private static final Logger LOG = LoggerFactory.getLogger(TestScanner.class);
080  private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
081
082  private static final byte[] FIRST_ROW = HConstants.EMPTY_START_ROW;
083  private static final byte[][] COLS = { HConstants.CATALOG_FAMILY };
084  private static final byte[][] EXPLICIT_COLS =
085    { HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER,
086    // TODO ryan
087    // HConstants.STARTCODE_QUALIFIER
088    };
089
090  static final HTableDescriptor TESTTABLEDESC =
091    new HTableDescriptor(TableName.valueOf("testscanner"));
092  static {
093    TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)
094      // Ten is an arbitrary number. Keep versions to help debugging.
095      .setMaxVersions(10).setBlockCacheEnabled(false).setBlocksize(8 * 1024));
096  }
097  /** HRegionInfo for root region */
098  public static final HRegionInfo REGION_INFO = new HRegionInfo(TESTTABLEDESC.getTableName(),
099    HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
100
101  private static final byte[] ROW_KEY = REGION_INFO.getRegionName();
102
103  private static final long START_CODE = Long.MAX_VALUE;
104
105  private HRegion region;
106
107  private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
108  final private byte[] col1;
109
110  public TestScanner() {
111    super();
112
113    firstRowBytes = START_KEY_BYTES;
114    secondRowBytes = START_KEY_BYTES.clone();
115    // Increment the least significant character so we get to next row.
116    secondRowBytes[START_KEY_BYTES.length - 1]++;
117    thirdRowBytes = START_KEY_BYTES.clone();
118    thirdRowBytes[START_KEY_BYTES.length - 1] =
119      (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2);
120    col1 = Bytes.toBytes("column1");
121  }
122
123  /**
124   * Test basic stop row filter works.
125   */
126  @Test
127  public void testStopRow() throws Exception {
128    byte[] startrow = Bytes.toBytes("bbb");
129    byte[] stoprow = Bytes.toBytes("ccc");
130    try {
131      this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
132      HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY);
133      List<Cell> results = new ArrayList<>();
134      // Do simple test of getting one row only first.
135      Scan scan = new Scan(Bytes.toBytes("abc"), Bytes.toBytes("abd"));
136      scan.addFamily(HConstants.CATALOG_FAMILY);
137
138      InternalScanner s = region.getScanner(scan);
139      int count = 0;
140      while (s.next(results)) {
141        count++;
142      }
143      s.close();
144      assertEquals(0, count);
145      // Now do something a bit more imvolved.
146      scan = new Scan(startrow, stoprow);
147      scan.addFamily(HConstants.CATALOG_FAMILY);
148
149      s = region.getScanner(scan);
150      count = 0;
151      Cell kv = null;
152      results = new ArrayList<>();
153      for (boolean first = true; s.next(results);) {
154        kv = results.get(0);
155        if (first) {
156          assertTrue(CellUtil.matchingRows(kv, startrow));
157          first = false;
158        }
159        count++;
160      }
161      assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, CellUtil.cloneRow(kv)) > 0);
162      // We got something back.
163      assertTrue(count > 10);
164      s.close();
165    } finally {
166      HBaseTestingUtility.closeRegionAndWAL(this.region);
167    }
168  }
169
170  void rowPrefixFilter(Scan scan) throws IOException {
171    List<Cell> results = new ArrayList<>();
172    scan.addFamily(HConstants.CATALOG_FAMILY);
173    InternalScanner s = region.getScanner(scan);
174    boolean hasMore = true;
175    while (hasMore) {
176      hasMore = s.next(results);
177      for (Cell kv : results) {
178        assertEquals((byte) 'a', CellUtil.cloneRow(kv)[0]);
179        assertEquals((byte) 'b', CellUtil.cloneRow(kv)[1]);
180      }
181      results.clear();
182    }
183    s.close();
184  }
185
186  void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException {
187    List<Cell> results = new ArrayList<>();
188    scan.addFamily(HConstants.CATALOG_FAMILY);
189    InternalScanner s = region.getScanner(scan);
190    boolean hasMore = true;
191    while (hasMore) {
192      hasMore = s.next(results);
193      for (Cell kv : results) {
194        assertTrue(Bytes.compareTo(CellUtil.cloneRow(kv), stopRow) <= 0);
195      }
196      results.clear();
197    }
198    s.close();
199  }
200
201  @Test
202  public void testFilters() throws IOException {
203    try {
204      this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
205      HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY);
206      byte[] prefix = Bytes.toBytes("ab");
207      Filter newFilter = new PrefixFilter(prefix);
208      Scan scan = new Scan();
209      scan.setFilter(newFilter);
210      rowPrefixFilter(scan);
211
212      byte[] stopRow = Bytes.toBytes("bbc");
213      newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow));
214      scan = new Scan();
215      scan.setFilter(newFilter);
216      rowInclusiveStopFilter(scan, stopRow);
217
218    } finally {
219      HBaseTestingUtility.closeRegionAndWAL(this.region);
220    }
221  }
222
223  /**
224   * Test that closing a scanner while a client is using it doesn't throw NPEs but instead a
225   * UnknownScannerException. HBASE-2503
226   */
227  @Test
228  public void testRaceBetweenClientAndTimeout() throws Exception {
229    try {
230      this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
231      HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY);
232      Scan scan = new Scan();
233      InternalScanner s = region.getScanner(scan);
234      List<Cell> results = new ArrayList<>();
235      try {
236        s.next(results);
237        s.close();
238        s.next(results);
239        fail("We don't want anything more, we should be failing");
240      } catch (UnknownScannerException ex) {
241        // ok!
242        return;
243      }
244    } finally {
245      HBaseTestingUtility.closeRegionAndWAL(this.region);
246    }
247  }
248
249  /**
250   * The test!
251   */
252  @Test
253  public void testScanner() throws IOException {
254    try {
255      region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
256      Table table = new RegionAsTable(region);
257
258      // Write information to the meta table
259
260      Put put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime());
261
262      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
263        REGION_INFO.toByteArray());
264      table.put(put);
265
266      // What we just committed is in the memstore. Verify that we can get
267      // it back both with scanning and get
268
269      scan(false, null);
270      getRegionInfo(table);
271
272      // Close and re-open
273
274      ((HRegion) region).close();
275      region = HRegion.openHRegion(region, null);
276      table = new RegionAsTable(region);
277
278      // Verify we can get the data back now that it is on disk.
279
280      scan(false, null);
281      getRegionInfo(table);
282
283      // Store some new information
284
285      String address = HConstants.LOCALHOST_IP + ":" + HBaseTestingUtility.randomFreePort();
286
287      put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime());
288      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes.toBytes(address));
289
290      // put.add(HConstants.COL_STARTCODE, Bytes.toBytes(START_CODE));
291
292      table.put(put);
293
294      // Validate that we can still get the HRegionInfo, even though it is in
295      // an older row on disk and there is a newer row in the memstore
296
297      scan(true, address.toString());
298      getRegionInfo(table);
299
300      // flush cache
301      this.region.flush(true);
302
303      // Validate again
304
305      scan(true, address.toString());
306      getRegionInfo(table);
307
308      // Close and reopen
309
310      ((HRegion) region).close();
311      region = HRegion.openHRegion(region, null);
312      table = new RegionAsTable(region);
313
314      // Validate again
315
316      scan(true, address.toString());
317      getRegionInfo(table);
318
319      // Now update the information again
320
321      address = "bar.foo.com:4321";
322
323      put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime());
324
325      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes.toBytes(address));
326      table.put(put);
327
328      // Validate again
329
330      scan(true, address.toString());
331      getRegionInfo(table);
332
333      // flush cache
334
335      region.flush(true);
336
337      // Validate again
338
339      scan(true, address.toString());
340      getRegionInfo(table);
341
342      // Close and reopen
343
344      ((HRegion) this.region).close();
345      this.region = HRegion.openHRegion(region, null);
346      table = new RegionAsTable(this.region);
347
348      // Validate again
349
350      scan(true, address.toString());
351      getRegionInfo(table);
352
353    } finally {
354      // clean up
355      HBaseTestingUtility.closeRegionAndWAL(this.region);
356    }
357  }
358
359  /** Compare the HRegionInfo we read from HBase to what we stored */
360  private void validateRegionInfo(byte[] regionBytes) throws IOException {
361    HRegionInfo info = HRegionInfo.parseFromOrNull(regionBytes);
362
363    assertEquals(REGION_INFO.getRegionId(), info.getRegionId());
364    assertEquals(0, info.getStartKey().length);
365    assertEquals(0, info.getEndKey().length);
366    assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName()));
367    // assertEquals(0, info.getTableDesc().compareTo(REGION_INFO.getTableDesc()));
368  }
369
370  /** Use a scanner to get the region info and then validate the results */
371  private void scan(boolean validateStartcode, String serverName) throws IOException {
372    InternalScanner scanner = null;
373    Scan scan = null;
374    List<Cell> results = new ArrayList<>();
375    byte[][][] scanColumns = { COLS, EXPLICIT_COLS };
376    for (int i = 0; i < scanColumns.length; i++) {
377      try {
378        scan = new Scan(FIRST_ROW);
379        for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) {
380          scan.addColumn(COLS[0], EXPLICIT_COLS[ii]);
381        }
382        scanner = region.getScanner(scan);
383        while (scanner.next(results)) {
384          assertTrue(
385            hasColumn(results, HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER));
386          byte[] val = CellUtil.cloneValue(
387            getColumn(results, HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER));
388          validateRegionInfo(val);
389          if (validateStartcode) {
390            // assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
391            // HConstants.STARTCODE_QUALIFIER));
392            // val = getColumn(results, HConstants.CATALOG_FAMILY,
393            // HConstants.STARTCODE_QUALIFIER).getValue();
394            assertNotNull(val);
395            assertFalse(val.length == 0);
396            long startCode = Bytes.toLong(val);
397            assertEquals(START_CODE, startCode);
398          }
399
400          if (serverName != null) {
401            assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER));
402            val = CellUtil.cloneValue(
403              getColumn(results, HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER));
404            assertNotNull(val);
405            assertFalse(val.length == 0);
406            String server = Bytes.toString(val);
407            assertEquals(0, server.compareTo(serverName));
408          }
409        }
410      } finally {
411        InternalScanner s = scanner;
412        scanner = null;
413        if (s != null) {
414          s.close();
415        }
416      }
417    }
418  }
419
420  private boolean hasColumn(final List<Cell> kvs, final byte[] family, final byte[] qualifier) {
421    for (Cell kv : kvs) {
422      if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
423        return true;
424      }
425    }
426    return false;
427  }
428
429  private Cell getColumn(final List<Cell> kvs, final byte[] family, final byte[] qualifier) {
430    for (Cell kv : kvs) {
431      if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
432        return kv;
433      }
434    }
435    return null;
436  }
437
438  /** Use get to retrieve the HRegionInfo and validate it */
439  private void getRegionInfo(Table table) throws IOException {
440    Get get = new Get(ROW_KEY);
441    get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
442    Result result = table.get(get);
443    byte[] bytes = result.value();
444    validateRegionInfo(bytes);
445  }
446
447  /**
448   * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner update
449   * readers code essentially. This is not highly concurrent, since its all 1 thread. HBase-910.
450   */
451  @Test
452  public void testScanAndSyncFlush() throws Exception {
453    this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
454    Table hri = new RegionAsTable(region);
455    try {
456      LOG.info("Added: " + HTestConst.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
457        Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
458      int count = count(hri, -1, false);
459      assertEquals(count, count(hri, 100, false)); // do a sync flush.
460    } catch (Exception e) {
461      LOG.error("Failed", e);
462      throw e;
463    } finally {
464      HBaseTestingUtility.closeRegionAndWAL(this.region);
465    }
466  }
467
468  /**
469   * Tests to do a concurrent flush (using a 2nd thread) while scanning. This tests both the
470   * StoreScanner update readers and the transition from memstore -> snapshot -> store file.
471   */
472  @Test
473  public void testScanAndRealConcurrentFlush() throws Exception {
474    this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
475    Table hri = new RegionAsTable(region);
476    try {
477      LOG.info("Added: " + HTestConst.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
478        Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
479      int count = count(hri, -1, false);
480      assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush
481    } catch (Exception e) {
482      LOG.error("Failed", e);
483      throw e;
484    } finally {
485      HBaseTestingUtility.closeRegionAndWAL(this.region);
486    }
487  }
488
489  /**
490   * Make sure scanner returns correct result when we run a major compaction with deletes.
491   */
492  @Test
493  @SuppressWarnings("deprecation")
494  public void testScanAndConcurrentMajorCompact() throws Exception {
495    HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name.getMethodName());
496    this.region = TEST_UTIL.createLocalHRegion(htd, null, null);
497    Table hri = new RegionAsTable(region);
498
499    try {
500      HTestConst.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), firstRowBytes,
501        secondRowBytes);
502      HTestConst.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), firstRowBytes,
503        secondRowBytes);
504
505      Delete dc = new Delete(firstRowBytes);
506      /* delete column1 of firstRow */
507      dc.addColumns(fam1, col1);
508      region.delete(dc);
509      region.flush(true);
510
511      HTestConst.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), secondRowBytes,
512        thirdRowBytes);
513      HTestConst.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), secondRowBytes,
514        thirdRowBytes);
515      region.flush(true);
516
517      InternalScanner s = region.getScanner(new Scan());
518      // run a major compact, column1 of firstRow will be cleaned.
519      region.compact(true);
520
521      List<Cell> results = new ArrayList<>();
522      s.next(results);
523
524      // make sure returns column2 of firstRow
525      assertTrue("result is not correct, keyValues : " + results, results.size() == 1);
526      assertTrue(CellUtil.matchingRows(results.get(0), firstRowBytes));
527      assertTrue(CellUtil.matchingFamily(results.get(0), fam2));
528
529      results = new ArrayList<>();
530      s.next(results);
531
532      // get secondRow
533      assertTrue(results.size() == 2);
534      assertTrue(CellUtil.matchingRows(results.get(0), secondRowBytes));
535      assertTrue(CellUtil.matchingFamily(results.get(0), fam1));
536      assertTrue(CellUtil.matchingFamily(results.get(1), fam2));
537    } finally {
538      HBaseTestingUtility.closeRegionAndWAL(this.region);
539    }
540  }
541
542  /**
543   * Count table.
544   * @param hri        Region
545   * @param flushIndex At what row we start the flush.
546   * @param concurrent if the flush should be concurrent or sync.
547   * @return Count of rows found.
548   */
549  private int count(final Table countTable, final int flushIndex, boolean concurrent)
550    throws Exception {
551    LOG.info("Taking out counting scan");
552    Scan scan = new Scan();
553    for (byte[] qualifier : EXPLICIT_COLS) {
554      scan.addColumn(HConstants.CATALOG_FAMILY, qualifier);
555    }
556    ResultScanner s = countTable.getScanner(scan);
557    int count = 0;
558    boolean justFlushed = false;
559    while (s.next() != null) {
560      if (justFlushed) {
561        LOG.info("after next() just after next flush");
562        justFlushed = false;
563      }
564      count++;
565      if (flushIndex == count) {
566        LOG.info("Starting flush at flush index " + flushIndex);
567        Thread t = new Thread() {
568          @Override
569          public void run() {
570            try {
571              region.flush(true);
572              LOG.info("Finishing flush");
573            } catch (IOException e) {
574              LOG.info("Failed flush cache");
575            }
576          }
577        };
578        t.start();
579        if (!concurrent) {
580          // sync flush
581          t.join();
582        }
583        LOG.info("Continuing on after kicking off background flush");
584        justFlushed = true;
585      }
586    }
587    s.close();
588    LOG.info("Found " + count + " items");
589    return count;
590  }
591}