001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.List;
024import java.util.stream.Collectors;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileStatus;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellScanner;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HRegionInfo;
034import org.apache.hadoop.hbase.StartMiniClusterOption;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
037import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
038import org.apache.hadoop.hbase.regionserver.HRegion;
039import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
040import org.apache.hadoop.hbase.regionserver.HRegionServer;
041import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
042import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
043import org.apache.hadoop.hbase.testclassification.ClientTests;
044import org.apache.hadoop.hbase.testclassification.LargeTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.CommonFSUtils;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.FSUtils;
049import org.apache.hadoop.hbase.util.HFileArchiveUtil;
050import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
051import org.junit.After;
052import org.junit.Assert;
053import org.junit.Before;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.TestName;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062@Category({ LargeTests.class, ClientTests.class })
063public class TestTableSnapshotScanner {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067    HBaseClassTestRule.forClass(TestTableSnapshotScanner.class);
068
069  private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotScanner.class);
070  private final HBaseTestingUtility UTIL = new HBaseTestingUtility();
071  private static final int NUM_REGION_SERVERS = 2;
072  private static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2") };
073  public static byte[] bbb = Bytes.toBytes("bbb");
074  public static byte[] yyy = Bytes.toBytes("yyy");
075
076  private FileSystem fs;
077  private Path rootDir;
078  private boolean clusterUp;
079
080  @Rule
081  public TestName name = new TestName();
082
083  public static void blockUntilSplitFinished(HBaseTestingUtility util, TableName tableName,
084    int expectedRegionSize) throws Exception {
085    for (int i = 0; i < 100; i++) {
086      List<HRegionInfo> hRegionInfoList = util.getAdmin().getTableRegions(tableName);
087      if (hRegionInfoList.size() >= expectedRegionSize) {
088        break;
089      }
090      Thread.sleep(1000);
091    }
092  }
093
094  @Before
095  public void setupCluster() throws Exception {
096    setupConf(UTIL.getConfiguration());
097    StartMiniClusterOption option =
098      StartMiniClusterOption.builder().numRegionServers(NUM_REGION_SERVERS)
099        .numDataNodes(NUM_REGION_SERVERS).createRootDir(true).build();
100    UTIL.startMiniCluster(option);
101    clusterUp = true;
102    rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
103    fs = rootDir.getFileSystem(UTIL.getConfiguration());
104  }
105
106  @After
107  public void tearDownCluster() throws Exception {
108    if (clusterUp) {
109      UTIL.shutdownMiniCluster();
110    }
111  }
112
113  protected void setupConf(Configuration conf) {
114    // Enable snapshot
115    conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
116  }
117
118  public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName,
119    String snapshotName, int numRegions) throws Exception {
120    try {
121      util.deleteTable(tableName);
122    } catch (Exception ex) {
123      // ignore
124    }
125
126    if (numRegions > 1) {
127      util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions);
128    } else {
129      util.createTable(tableName, FAMILIES);
130    }
131    Admin admin = util.getAdmin();
132
133    // put some stuff in the table
134    Table table = util.getConnection().getTable(tableName);
135    util.loadTable(table, FAMILIES);
136
137    Path rootDir = CommonFSUtils.getRootDir(util.getConfiguration());
138    FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
139
140    SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), null,
141      snapshotName, rootDir, fs, true);
142
143    // load different values
144    byte[] value = Bytes.toBytes("after_snapshot_value");
145    util.loadTable(table, FAMILIES, value);
146
147    // cause flush to create new files in the region
148    admin.flush(tableName);
149    table.close();
150  }
151
152  @Test
153  public void testNoDuplicateResultsWhenSplitting() throws Exception {
154    TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
155    String snapshotName = "testSnapshotBug";
156    try {
157      if (UTIL.getAdmin().tableExists(tableName)) {
158        UTIL.deleteTable(tableName);
159      }
160
161      UTIL.createTable(tableName, FAMILIES);
162      Admin admin = UTIL.getAdmin();
163
164      // put some stuff in the table
165      Table table = UTIL.getConnection().getTable(tableName);
166      UTIL.loadTable(table, FAMILIES);
167
168      // split to 2 regions
169      admin.split(tableName, Bytes.toBytes("eee"));
170      blockUntilSplitFinished(UTIL, tableName, 2);
171
172      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
173      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
174
175      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
176        null, snapshotName, rootDir, fs, true);
177
178      // load different values
179      byte[] value = Bytes.toBytes("after_snapshot_value");
180      UTIL.loadTable(table, FAMILIES, value);
181
182      // cause flush to create new files in the region
183      admin.flush(tableName);
184      table.close();
185
186      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
187      Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan
188
189      TableSnapshotScanner scanner =
190        new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
191
192      verifyScanner(scanner, bbb, yyy);
193      scanner.close();
194    } catch (Exception e) {
195      e.printStackTrace();
196    } finally {
197      UTIL.getAdmin().deleteSnapshot(snapshotName);
198      UTIL.deleteTable(tableName);
199    }
200  }
201
202  @Test
203  public void testScanLimit() throws Exception {
204    final TableName tableName = TableName.valueOf(name.getMethodName());
205    final String snapshotName = tableName + "Snapshot";
206    TableSnapshotScanner scanner = null;
207    try {
208      createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
209      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
210      Scan scan = new Scan().withStartRow(bbb).setLimit(100); // limit the scan
211
212      scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
213      int count = 0;
214      while (true) {
215        Result result = scanner.next();
216        if (result == null) {
217          break;
218        }
219        count++;
220      }
221      Assert.assertEquals(100, count);
222    } finally {
223      if (scanner != null) {
224        scanner.close();
225      }
226      UTIL.getAdmin().deleteSnapshot(snapshotName);
227      UTIL.deleteTable(tableName);
228    }
229  }
230
231  @Test
232  public void testWithSingleRegion() throws Exception {
233    testScanner(UTIL, "testWithSingleRegion", 1, false);
234  }
235
236  @Test
237  public void testWithMultiRegion() throws Exception {
238    testScanner(UTIL, "testWithMultiRegion", 10, false);
239  }
240
241  @Test
242  public void testWithOfflineHBaseMultiRegion() throws Exception {
243    testScanner(UTIL, "testWithMultiRegion", 20, true);
244  }
245
246  @Test
247  public void testScannerWithRestoreScanner() throws Exception {
248    TableName tableName = TableName.valueOf("testScanner");
249    String snapshotName = "testScannerWithRestoreScanner";
250    try {
251      createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
252      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
253      Scan scan = new Scan(bbb, yyy); // limit the scan
254
255      Configuration conf = UTIL.getConfiguration();
256      Path rootDir = CommonFSUtils.getRootDir(conf);
257
258      TableSnapshotScanner scanner0 =
259        new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
260      verifyScanner(scanner0, bbb, yyy);
261      scanner0.close();
262
263      // restore snapshot.
264      RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
265
266      // scan the snapshot without restoring snapshot
267      TableSnapshotScanner scanner =
268        new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
269      verifyScanner(scanner, bbb, yyy);
270      scanner.close();
271
272      // check whether the snapshot has been deleted by the close of scanner.
273      scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
274      verifyScanner(scanner, bbb, yyy);
275      scanner.close();
276
277      // restore snapshot again.
278      RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
279
280      // check whether the snapshot has been deleted by the close of scanner.
281      scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
282      verifyScanner(scanner, bbb, yyy);
283      scanner.close();
284    } finally {
285      UTIL.getAdmin().deleteSnapshot(snapshotName);
286      UTIL.deleteTable(tableName);
287    }
288  }
289
290  private void testScanner(HBaseTestingUtility util, String snapshotName, int numRegions,
291    boolean shutdownCluster) throws Exception {
292    TableName tableName = TableName.valueOf("testScanner");
293    try {
294      createTableAndSnapshot(util, tableName, snapshotName, numRegions);
295
296      if (shutdownCluster) {
297        util.shutdownMiniHBaseCluster();
298        clusterUp = false;
299      }
300
301      Path restoreDir = util.getDataTestDirOnTestFS(snapshotName);
302      Scan scan = new Scan(bbb, yyy); // limit the scan
303
304      TableSnapshotScanner scanner =
305        new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
306
307      verifyScanner(scanner, bbb, yyy);
308      scanner.close();
309    } finally {
310      if (clusterUp) {
311        util.getAdmin().deleteSnapshot(snapshotName);
312        util.deleteTable(tableName);
313      }
314    }
315  }
316
317  private void verifyScanner(ResultScanner scanner, byte[] startRow, byte[] stopRow)
318    throws IOException, InterruptedException {
319
320    HBaseTestingUtility.SeenRowTracker rowTracker =
321      new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
322
323    while (true) {
324      Result result = scanner.next();
325      if (result == null) {
326        break;
327      }
328      verifyRow(result);
329      rowTracker.addRow(result.getRow());
330    }
331
332    // validate all rows are seen
333    rowTracker.validate();
334  }
335
336  private static void verifyRow(Result result) throws IOException {
337    byte[] row = result.getRow();
338    CellScanner scanner = result.cellScanner();
339    while (scanner.advance()) {
340      Cell cell = scanner.current();
341
342      // assert that all Cells in the Result have the same key
343      Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length, cell.getRowArray(),
344        cell.getRowOffset(), cell.getRowLength()));
345    }
346
347    for (int j = 0; j < FAMILIES.length; j++) {
348      byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
349      Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
350        + " ,actual:" + Bytes.toString(actual), row, actual);
351    }
352  }
353
354  @Test
355  public void testMergeRegion() throws Exception {
356    TableName tableName = TableName.valueOf("testMergeRegion");
357    String snapshotName = tableName.getNameAsString() + "_snapshot";
358    Configuration conf = UTIL.getConfiguration();
359    Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
360    long timeout = 20000; // 20s
361    try (Admin admin = UTIL.getAdmin()) {
362      List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName())
363        .collect(Collectors.toList());
364      // create table with 3 regions
365      Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3);
366      List<RegionInfo> regions = admin.getRegions(tableName);
367      Assert.assertEquals(3, regions.size());
368      RegionInfo region0 = regions.get(0);
369      RegionInfo region1 = regions.get(1);
370      RegionInfo region2 = regions.get(2);
371      // put some data in the table
372      UTIL.loadTable(table, FAMILIES);
373      admin.flush(tableName);
374      // wait flush is finished
375      UTIL.waitFor(timeout, () -> {
376        try {
377          Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
378          for (RegionInfo region : regions) {
379            Path regionDir = new Path(tableDir, region.getEncodedName());
380            for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) {
381              for (FileStatus fs : fs.listStatus(familyDir)) {
382                if (!fs.getPath().getName().equals(".filelist")) {
383                  return true;
384                }
385              }
386              return false;
387            }
388          }
389          return true;
390        } catch (IOException e) {
391          LOG.warn("Failed check if flush is finished", e);
392          return false;
393        }
394      });
395      // merge 2 regions
396      admin.compactionSwitch(false, serverList);
397      admin.mergeRegionsAsync(region0.getEncodedNameAsBytes(), region1.getEncodedNameAsBytes(),
398        true);
399      UTIL.waitFor(timeout, () -> admin.getRegions(tableName).size() == 2);
400      List<RegionInfo> mergedRegions = admin.getRegions(tableName);
401      RegionInfo mergedRegion =
402        mergedRegions.get(0).getEncodedName().equals(region2.getEncodedName())
403          ? mergedRegions.get(1)
404          : mergedRegions.get(0);
405      // snapshot
406      admin.snapshot(snapshotName, tableName);
407      Assert.assertEquals(1, admin.listSnapshots().size());
408      // major compact
409      admin.compactionSwitch(true, serverList);
410      admin.majorCompactRegion(mergedRegion.getRegionName());
411      // wait until merged region has no reference
412      UTIL.waitFor(timeout, () -> {
413        try {
414          for (RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster()
415            .getRegionServerThreads()) {
416            HRegionServer regionServer = regionServerThread.getRegionServer();
417            for (HRegion subRegion : regionServer.getRegions(tableName)) {
418              if (
419                subRegion.getRegionInfo().getEncodedName().equals(mergedRegion.getEncodedName())
420              ) {
421                regionServer.getCompactedHFilesDischarger().chore();
422              }
423            }
424          }
425          Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
426          HRegionFileSystem regionFs = HRegionFileSystem
427            .openRegionFromFileSystem(UTIL.getConfiguration(), fs, tableDir, mergedRegion, true);
428          return !regionFs.hasReferences(admin.getDescriptor(tableName));
429        } catch (IOException e) {
430          LOG.warn("Failed check merged region has no reference", e);
431          return false;
432        }
433      });
434      // run catalog janitor to clean and wait for parent regions are archived
435      UTIL.getMiniHBaseCluster().getMaster().getCatalogJanitor().choreForTesting();
436      UTIL.waitFor(timeout, () -> {
437        try {
438          Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
439          for (FileStatus fileStatus : fs.listStatus(tableDir)) {
440            String name = fileStatus.getPath().getName();
441            if (name.equals(region0.getEncodedName()) || name.equals(region1.getEncodedName())) {
442              return false;
443            }
444          }
445          return true;
446        } catch (IOException e) {
447          LOG.warn("Check if parent regions are archived error", e);
448          return false;
449        }
450      });
451      // set file modify time and then run cleaner
452      long time = EnvironmentEdgeManager.currentTime() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000;
453      traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time);
454      UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().triggerCleanerNow().get();
455      // scan snapshot
456      try (TableSnapshotScanner scanner = new TableSnapshotScanner(conf,
457        UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, new Scan(bbb, yyy))) {
458        verifyScanner(scanner, bbb, yyy);
459      }
460    } catch (Exception e) {
461      LOG.error("scan snapshot error", e);
462      Assert.fail("Should not throw Exception: " + e.getMessage());
463      Assert.fail("Should not throw FileNotFoundException");
464      Assert.assertTrue(e.getCause() != null);
465      Assert.assertTrue(e.getCause().getCause() instanceof FileNotFoundException);
466    }
467  }
468
469  @Test
470  public void testDeleteTableWithMergedRegions() throws Exception {
471    final TableName tableName = TableName.valueOf(this.name.getMethodName());
472    String snapshotName = tableName.getNameAsString() + "_snapshot";
473    Configuration conf = UTIL.getConfiguration();
474    try (Admin admin = UTIL.getConnection().getAdmin()) {
475      // disable compaction
476      admin.compactionSwitch(false,
477        admin.getRegionServers().stream().map(s -> s.getServerName()).collect(Collectors.toList()));
478      // create table
479      Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3);
480      List<RegionInfo> regions = admin.getRegions(tableName);
481      Assert.assertEquals(3, regions.size());
482      // write some data
483      UTIL.loadTable(table, FAMILIES);
484      // merge region
485      admin.mergeRegionsAsync(new byte[][] { regions.get(0).getEncodedNameAsBytes(),
486        regions.get(1).getEncodedNameAsBytes() }, false).get();
487      regions = admin.getRegions(tableName);
488      Assert.assertEquals(2, regions.size());
489      // snapshot
490      admin.snapshot(snapshotName, tableName);
491      // verify snapshot
492      try (TableSnapshotScanner scanner =
493        new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName,
494          new Scan().withStartRow(bbb).withStopRow(yyy))) {
495        verifyScanner(scanner, bbb, yyy);
496      }
497      // drop table
498      admin.disableTable(tableName);
499      admin.deleteTable(tableName);
500      // verify snapshot
501      try (TableSnapshotScanner scanner =
502        new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName,
503          new Scan().withStartRow(bbb).withStopRow(yyy))) {
504        verifyScanner(scanner, bbb, yyy);
505      }
506    }
507  }
508
509  private void traverseAndSetFileTime(Path path, long time) throws IOException {
510    fs.setTimes(path, time, -1);
511    if (fs.isDirectory(path)) {
512      List<FileStatus> allPaths = Arrays.asList(fs.listStatus(path));
513      List<FileStatus> subDirs =
514        allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
515      List<FileStatus> files =
516        allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
517      for (FileStatus subDir : subDirs) {
518        traverseAndSetFileTime(subDir.getPath(), time);
519      }
520      for (FileStatus file : files) {
521        fs.setTimes(file.getPath(), time, -1);
522      }
523    }
524  }
525}