001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.stream.Collectors;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileStatus;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellScanner;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.StartTestingClusterOption;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
035import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
036import org.apache.hadoop.hbase.regionserver.HRegion;
037import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
038import org.apache.hadoop.hbase.regionserver.HRegionServer;
039import org.apache.hadoop.hbase.regionserver.StoreContext;
040import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
041import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
042import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
043import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
044import org.apache.hadoop.hbase.testclassification.ClientTests;
045import org.apache.hadoop.hbase.testclassification.LargeTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.CommonFSUtils;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.util.FSUtils;
050import org.apache.hadoop.hbase.util.HFileArchiveUtil;
051import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
052import org.junit.After;
053import org.junit.Assert;
054import org.junit.Before;
055import org.junit.ClassRule;
056import org.junit.Rule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.junit.rules.TestName;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063@Category({ LargeTests.class, ClientTests.class })
064public class TestTableSnapshotScanner {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestTableSnapshotScanner.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotScanner.class);
071  private final HBaseTestingUtil UTIL = new HBaseTestingUtil();
072  private static final int NUM_REGION_SERVERS = 2;
073  private static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2") };
074  public static byte[] bbb = Bytes.toBytes("bbb");
075  public static byte[] yyy = Bytes.toBytes("yyy");
076
077  private FileSystem fs;
078  private Path rootDir;
079  private boolean clusterUp;
080
081  @Rule
082  public TestName name = new TestName();
083
084  public static void blockUntilSplitFinished(HBaseTestingUtil util, TableName tableName,
085    int expectedRegionSize) throws Exception {
086    for (int i = 0; i < 100; i++) {
087      List<RegionInfo> hRegionInfoList = util.getAdmin().getRegions(tableName);
088      if (hRegionInfoList.size() >= expectedRegionSize) {
089        break;
090      }
091      Thread.sleep(1000);
092    }
093  }
094
095  @Before
096  public void setupCluster() throws Exception {
097    setupConf(UTIL.getConfiguration());
098    StartTestingClusterOption option =
099      StartTestingClusterOption.builder().numRegionServers(NUM_REGION_SERVERS)
100        .numDataNodes(NUM_REGION_SERVERS).createRootDir(true).build();
101    UTIL.startMiniCluster(option);
102    clusterUp = true;
103    rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
104    fs = rootDir.getFileSystem(UTIL.getConfiguration());
105  }
106
107  @After
108  public void tearDownCluster() throws Exception {
109    if (clusterUp) {
110      UTIL.shutdownMiniCluster();
111    }
112  }
113
114  protected void setupConf(Configuration conf) {
115    // Enable snapshot
116    conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
117  }
118
119  public static void createTableAndSnapshot(HBaseTestingUtil util, TableName tableName,
120    String snapshotName, int numRegions) throws Exception {
121    try {
122      util.deleteTable(tableName);
123    } catch (Exception ex) {
124      // ignore
125    }
126
127    if (numRegions > 1) {
128      util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions);
129    } else {
130      util.createTable(tableName, FAMILIES);
131    }
132    Admin admin = util.getAdmin();
133
134    // put some stuff in the table
135    Table table = util.getConnection().getTable(tableName);
136    util.loadTable(table, FAMILIES);
137
138    Path rootDir = CommonFSUtils.getRootDir(util.getConfiguration());
139    FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
140
141    SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), null,
142      snapshotName, rootDir, fs, true);
143
144    // load different values
145    byte[] value = Bytes.toBytes("after_snapshot_value");
146    util.loadTable(table, FAMILIES, value);
147
148    // cause flush to create new files in the region
149    admin.flush(tableName);
150    table.close();
151  }
152
153  @Test
154  public void testNoDuplicateResultsWhenSplitting() throws Exception {
155    TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
156    String snapshotName = "testSnapshotBug";
157    try {
158      if (UTIL.getAdmin().tableExists(tableName)) {
159        UTIL.deleteTable(tableName);
160      }
161
162      UTIL.createTable(tableName, FAMILIES);
163      Admin admin = UTIL.getAdmin();
164
165      // put some stuff in the table
166      Table table = UTIL.getConnection().getTable(tableName);
167      UTIL.loadTable(table, FAMILIES);
168
169      // split to 2 regions
170      admin.split(tableName, Bytes.toBytes("eee"));
171      blockUntilSplitFinished(UTIL, tableName, 2);
172
173      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
174      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
175
176      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
177        null, snapshotName, rootDir, fs, true);
178
179      // load different values
180      byte[] value = Bytes.toBytes("after_snapshot_value");
181      UTIL.loadTable(table, FAMILIES, value);
182
183      // cause flush to create new files in the region
184      admin.flush(tableName);
185      table.close();
186
187      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
188      Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan
189
190      TableSnapshotScanner scanner =
191        new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
192
193      verifyScanner(scanner, bbb, yyy);
194      scanner.close();
195    } catch (Exception e) {
196      e.printStackTrace();
197    } finally {
198      UTIL.getAdmin().deleteSnapshot(snapshotName);
199      UTIL.deleteTable(tableName);
200    }
201  }
202
203  @Test
204  public void testScanLimit() throws Exception {
205    final TableName tableName = TableName.valueOf(name.getMethodName());
206    final String snapshotName = tableName + "Snapshot";
207    TableSnapshotScanner scanner = null;
208    try {
209      createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
210      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
211      Scan scan = new Scan().withStartRow(bbb).setLimit(100); // limit the scan
212
213      scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
214      int count = 0;
215      while (true) {
216        Result result = scanner.next();
217        if (result == null) {
218          break;
219        }
220        count++;
221      }
222      Assert.assertEquals(100, count);
223    } finally {
224      if (scanner != null) {
225        scanner.close();
226      }
227      UTIL.getAdmin().deleteSnapshot(snapshotName);
228      UTIL.deleteTable(tableName);
229    }
230  }
231
232  @Test
233  public void testWithSingleRegion() throws Exception {
234    testScanner(UTIL, "testWithSingleRegion", 1, false);
235  }
236
237  @Test
238  public void testWithMultiRegion() throws Exception {
239    testScanner(UTIL, "testWithMultiRegion", 10, false);
240  }
241
242  @Test
243  public void testWithOfflineHBaseMultiRegion() throws Exception {
244    testScanner(UTIL, "testWithMultiRegion", 20, true);
245  }
246
247  @Test
248  public void testScannerWithRestoreScanner() throws Exception {
249    TableName tableName = TableName.valueOf("testScanner");
250    String snapshotName = "testScannerWithRestoreScanner";
251    try {
252      createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
253      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
254      Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan
255
256      Configuration conf = UTIL.getConfiguration();
257      Path rootDir = CommonFSUtils.getRootDir(conf);
258
259      TableSnapshotScanner scanner0 =
260        new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
261      verifyScanner(scanner0, bbb, yyy);
262      scanner0.close();
263
264      // restore snapshot.
265      RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
266
267      // scan the snapshot without restoring snapshot
268      TableSnapshotScanner scanner =
269        new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
270      verifyScanner(scanner, bbb, yyy);
271      scanner.close();
272
273      // check whether the snapshot has been deleted by the close of scanner.
274      scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
275      verifyScanner(scanner, bbb, yyy);
276      scanner.close();
277
278      // restore snapshot again.
279      RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
280
281      // check whether the snapshot has been deleted by the close of scanner.
282      scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
283      verifyScanner(scanner, bbb, yyy);
284      scanner.close();
285    } finally {
286      UTIL.getAdmin().deleteSnapshot(snapshotName);
287      UTIL.deleteTable(tableName);
288    }
289  }
290
291  private void testScanner(HBaseTestingUtil util, String snapshotName, int numRegions,
292    boolean shutdownCluster) throws Exception {
293    TableName tableName = TableName.valueOf("testScanner");
294    try {
295      createTableAndSnapshot(util, tableName, snapshotName, numRegions);
296
297      if (shutdownCluster) {
298        util.shutdownMiniHBaseCluster();
299        clusterUp = false;
300      }
301
302      Path restoreDir = util.getDataTestDirOnTestFS(snapshotName);
303      Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan
304
305      TableSnapshotScanner scanner =
306        new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
307
308      verifyScanner(scanner, bbb, yyy);
309      scanner.close();
310    } finally {
311      if (clusterUp) {
312        util.getAdmin().deleteSnapshot(snapshotName);
313        util.deleteTable(tableName);
314      }
315    }
316  }
317
318  private void verifyScanner(ResultScanner scanner, byte[] startRow, byte[] stopRow)
319    throws IOException, InterruptedException {
320
321    HBaseTestingUtil.SeenRowTracker rowTracker =
322      new HBaseTestingUtil.SeenRowTracker(startRow, stopRow);
323
324    while (true) {
325      Result result = scanner.next();
326      if (result == null) {
327        break;
328      }
329      verifyRow(result);
330      rowTracker.addRow(result.getRow());
331    }
332
333    // validate all rows are seen
334    rowTracker.validate();
335  }
336
337  private static void verifyRow(Result result) throws IOException {
338    byte[] row = result.getRow();
339    CellScanner scanner = result.cellScanner();
340    while (scanner.advance()) {
341      Cell cell = scanner.current();
342
343      // assert that all Cells in the Result have the same key
344      Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length, cell.getRowArray(),
345        cell.getRowOffset(), cell.getRowLength()));
346    }
347
348    for (int j = 0; j < FAMILIES.length; j++) {
349      byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
350      Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
351        + " ,actual:" + Bytes.toString(actual), row, actual);
352    }
353  }
354
355  @Test
356  public void testMergeRegion() throws Exception {
357    TableName tableName = TableName.valueOf("testMergeRegion");
358    String snapshotName = tableName.getNameAsString() + "_snapshot";
359    Configuration conf = UTIL.getConfiguration();
360    Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
361    long timeout = 20000; // 20s
362    try (Admin admin = UTIL.getAdmin()) {
363      List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName())
364        .collect(Collectors.toList());
365      // create table with 3 regions
366      Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3);
367      List<RegionInfo> regions = admin.getRegions(tableName);
368      Assert.assertEquals(3, regions.size());
369      RegionInfo region0 = regions.get(0);
370      RegionInfo region1 = regions.get(1);
371      RegionInfo region2 = regions.get(2);
372      // put some data in the table
373      UTIL.loadTable(table, FAMILIES);
374      admin.flush(tableName);
375      // wait flush is finished
376      UTIL.waitFor(timeout, () -> {
377        try {
378          Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
379          for (RegionInfo region : regions) {
380            Path regionDir = new Path(tableDir, region.getEncodedName());
381            for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) {
382              for (FileStatus fs : fs.listStatus(familyDir)) {
383                if (!fs.getPath().getName().equals(".filelist")) {
384                  return true;
385                }
386              }
387              return false;
388            }
389          }
390          return true;
391        } catch (IOException e) {
392          LOG.warn("Failed check if flush is finished", e);
393          return false;
394        }
395      });
396      // merge 2 regions
397      admin.compactionSwitch(false, serverList);
398      admin.mergeRegionsAsync(region0.getEncodedNameAsBytes(), region1.getEncodedNameAsBytes(),
399        true);
400      UTIL.waitFor(timeout, () -> admin.getRegions(tableName).size() == 2);
401      List<RegionInfo> mergedRegions = admin.getRegions(tableName);
402      RegionInfo mergedRegion =
403        mergedRegions.get(0).getEncodedName().equals(region2.getEncodedName())
404          ? mergedRegions.get(1)
405          : mergedRegions.get(0);
406      // snapshot
407      admin.snapshot(snapshotName, tableName);
408      Assert.assertEquals(1, admin.listSnapshots().size());
409      // major compact
410      admin.compactionSwitch(true, serverList);
411      admin.majorCompactRegion(mergedRegion.getRegionName());
412      // wait until merged region has no reference
413      UTIL.waitFor(timeout, () -> {
414        try {
415          for (RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster()
416            .getRegionServerThreads()) {
417            HRegionServer regionServer = regionServerThread.getRegionServer();
418            for (HRegion subRegion : regionServer.getRegions(tableName)) {
419              if (
420                subRegion.getRegionInfo().getEncodedName().equals(mergedRegion.getEncodedName())
421              ) {
422                regionServer.getCompactedHFilesDischarger().chore();
423              }
424            }
425          }
426          Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
427          HRegionFileSystem regionFs = HRegionFileSystem
428            .openRegionFromFileSystem(UTIL.getConfiguration(), fs, tableDir, mergedRegion, true);
429          boolean references = false;
430          Path regionDir = new Path(tableDir, mergedRegion.getEncodedName());
431          for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) {
432            StoreContext storeContext = StoreContext.getBuilder()
433              .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of(familyDir.getName()))
434              .withRegionFileSystem(regionFs).withFamilyStoreDirectoryPath(familyDir).build();
435            StoreFileTracker sft =
436              StoreFileTrackerFactory.create(UTIL.getConfiguration(), false, storeContext);
437            references = references || sft.hasReferences();
438            if (references) {
439              break;
440            }
441          }
442          return !references;
443        } catch (IOException e) {
444          LOG.warn("Failed check merged region has no reference", e);
445          return false;
446        }
447      });
448      // run catalog janitor to clean and wait for parent regions are archived
449      UTIL.getMiniHBaseCluster().getMaster().getCatalogJanitor().choreForTesting();
450      UTIL.waitFor(timeout, () -> {
451        try {
452          Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
453          for (FileStatus fileStatus : fs.listStatus(tableDir)) {
454            String name = fileStatus.getPath().getName();
455            if (name.equals(region0.getEncodedName()) || name.equals(region1.getEncodedName())) {
456              return false;
457            }
458          }
459          return true;
460        } catch (IOException e) {
461          LOG.warn("Check if parent regions are archived error", e);
462          return false;
463        }
464      });
465      // set file modify time and then run cleaner
466      long time = EnvironmentEdgeManager.currentTime() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000;
467      traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time);
468      UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().triggerCleanerNow().get();
469      // scan snapshot
470      try (TableSnapshotScanner scanner =
471        new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName,
472          new Scan().withStartRow(bbb).withStopRow(yyy))) {
473        verifyScanner(scanner, bbb, yyy);
474      }
475    } catch (Exception e) {
476      LOG.error("scan snapshot error", e);
477      Assert.fail("Should not throw Exception: " + e.getMessage());
478    }
479  }
480
481  @Test
482  public void testDeleteTableWithMergedRegions() throws Exception {
483    final TableName tableName = TableName.valueOf(this.name.getMethodName());
484    String snapshotName = tableName.getNameAsString() + "_snapshot";
485    Configuration conf = UTIL.getConfiguration();
486    try (Admin admin = UTIL.getConnection().getAdmin()) {
487      // disable compaction
488      admin.compactionSwitch(false,
489        admin.getRegionServers().stream().map(s -> s.getServerName()).collect(Collectors.toList()));
490      // create table
491      Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3);
492      List<RegionInfo> regions = admin.getRegions(tableName);
493      Assert.assertEquals(3, regions.size());
494      // write some data
495      UTIL.loadTable(table, FAMILIES);
496      // merge region
497      admin.mergeRegionsAsync(new byte[][] { regions.get(0).getEncodedNameAsBytes(),
498        regions.get(1).getEncodedNameAsBytes() }, false).get();
499      regions = admin.getRegions(tableName);
500      Assert.assertEquals(2, regions.size());
501      // snapshot
502      admin.snapshot(snapshotName, tableName);
503      // verify snapshot
504      try (TableSnapshotScanner scanner =
505        new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName,
506          new Scan().withStartRow(bbb).withStopRow(yyy))) {
507        verifyScanner(scanner, bbb, yyy);
508      }
509      // drop table
510      admin.disableTable(tableName);
511      admin.deleteTable(tableName);
512      // verify snapshot
513      try (TableSnapshotScanner scanner =
514        new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName,
515          new Scan().withStartRow(bbb).withStopRow(yyy))) {
516        verifyScanner(scanner, bbb, yyy);
517      }
518    }
519  }
520
521  private void traverseAndSetFileTime(Path path, long time) throws IOException {
522    fs.setTimes(path, time, -1);
523    if (fs.isDirectory(path)) {
524      List<FileStatus> allPaths = Arrays.asList(fs.listStatus(path));
525      List<FileStatus> subDirs =
526        allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
527      List<FileStatus> files =
528        allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
529      for (FileStatus subDir : subDirs) {
530        traverseAndSetFileTime(subDir.getPath(), time);
531      }
532      for (FileStatus file : files) {
533        fs.setTimes(file.getPath(), time, -1);
534      }
535    }
536  }
537}