001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.example;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.ArgumentMatchers.anyList;
024import static org.mockito.Mockito.doAnswer;
025import static org.mockito.Mockito.doReturn;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.spy;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.concurrent.CountDownLatch;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.ChoreService;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.Stoppable;
042import org.apache.hadoop.hbase.client.ClusterConnection;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
045import org.apache.hadoop.hbase.client.ConnectionFactory;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
048import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
049import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
050import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
051import org.apache.hadoop.hbase.regionserver.HRegion;
052import org.apache.hadoop.hbase.regionserver.HStore;
053import org.apache.hadoop.hbase.regionserver.RegionServerServices;
054import org.apache.hadoop.hbase.testclassification.MediumTests;
055import org.apache.hadoop.hbase.testclassification.MiscTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.CommonFSUtils;
058import org.apache.hadoop.hbase.util.HFileArchiveUtil;
059import org.apache.hadoop.hbase.util.StoppableImplementation;
060import org.apache.hadoop.hbase.zookeeper.ZKUtil;
061import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
062import org.apache.zookeeper.KeeperException;
063import org.junit.After;
064import org.junit.AfterClass;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.mockito.invocation.InvocationOnMock;
070import org.mockito.stubbing.Answer;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074/**
075 * Spin up a small cluster and check that the hfiles of region are properly long-term archived as
076 * specified via the {@link ZKTableArchiveClient}.
077 */
078@Category({ MiscTests.class, MediumTests.class })
079public class TestZooKeeperTableArchiveClient {
080
081  @ClassRule
082  public static final HBaseClassTestRule CLASS_RULE =
083    HBaseClassTestRule.forClass(TestZooKeeperTableArchiveClient.class);
084
085  private static final Logger LOG = LoggerFactory.getLogger(TestZooKeeperTableArchiveClient.class);
086  private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
087  private static final String STRING_TABLE_NAME = "test";
088  private static final byte[] TEST_FAM = Bytes.toBytes("fam");
089  private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
090  private static ZKTableArchiveClient archivingClient;
091  private final List<Path> toCleanup = new ArrayList<>();
092  private static ClusterConnection CONNECTION;
093  private static RegionServerServices rss;
094  private static DirScanPool POOL;
095
096  /**
097   * Setup the config for the cluster
098   */
099  @BeforeClass
100  public static void setupCluster() throws Exception {
101    setupConf(UTIL.getConfiguration());
102    UTIL.startMiniZKCluster();
103    CONNECTION = (ClusterConnection) ConnectionFactory.createConnection(UTIL.getConfiguration());
104    archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), CONNECTION);
105    // make hfile archiving node so we can archive files
106    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
107    String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
108    ZKUtil.createWithParents(watcher, archivingZNode);
109    rss = mock(RegionServerServices.class);
110    POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
111  }
112
113  private static void setupConf(Configuration conf) {
114    // only compact with 3 files
115    conf.setInt("hbase.hstore.compaction.min", 3);
116  }
117
118  @After
119  public void tearDown() throws Exception {
120    try {
121      FileSystem fs = UTIL.getTestFileSystem();
122      // cleanup each of the files/directories registered
123      for (Path file : toCleanup) {
124        // remove the table and archive directories
125        CommonFSUtils.delete(fs, file, true);
126      }
127    } catch (IOException e) {
128      LOG.warn("Failure to delete archive directory", e);
129    } finally {
130      toCleanup.clear();
131    }
132    // make sure that backups are off for all tables
133    archivingClient.disableHFileBackup();
134  }
135
136  @AfterClass
137  public static void cleanupTest() throws Exception {
138    if (CONNECTION != null) {
139      CONNECTION.close();
140    }
141    UTIL.shutdownMiniZKCluster();
142    if (POOL != null) {
143      POOL.shutdownNow();
144    }
145  }
146
147  /**
148   * Test turning on/off archiving
149   */
150  @Test
151  public void testArchivingEnableDisable() throws Exception {
152    // 1. turn on hfile backups
153    LOG.debug("----Starting archiving");
154    archivingClient.enableHFileBackupAsync(TABLE_NAME);
155    assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(TABLE_NAME));
156
157    // 2. Turn off archiving and make sure its off
158    archivingClient.disableHFileBackup();
159    assertFalse("Archving didn't get turned off.", archivingClient.getArchivingEnabled(TABLE_NAME));
160
161    // 3. Check enable/disable on a single table
162    archivingClient.enableHFileBackupAsync(TABLE_NAME);
163    assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(TABLE_NAME));
164
165    // 4. Turn off archiving and make sure its off
166    archivingClient.disableHFileBackup(TABLE_NAME);
167    assertFalse("Archving didn't get turned off for " + STRING_TABLE_NAME,
168      archivingClient.getArchivingEnabled(TABLE_NAME));
169  }
170
171  @Test
172  public void testArchivingOnSingleTable() throws Exception {
173    createArchiveDirectory();
174    FileSystem fs = UTIL.getTestFileSystem();
175    Path archiveDir = getArchiveDir();
176    Path tableDir = getTableDir(STRING_TABLE_NAME);
177    toCleanup.add(archiveDir);
178    toCleanup.add(tableDir);
179
180    Configuration conf = UTIL.getConfiguration();
181    // setup the delegate
182    Stoppable stop = new StoppableImplementation();
183    HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
184    List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
185    final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
186
187    // create the region
188    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM);
189    HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
190    List<HRegion> regions = new ArrayList<>();
191    regions.add(region);
192    doReturn(regions).when(rss).getRegions();
193    final CompactedHFilesDischarger compactionCleaner =
194      new CompactedHFilesDischarger(100, stop, rss, false);
195    loadFlushAndCompact(region, TEST_FAM);
196    compactionCleaner.chore();
197    // get the current hfiles in the archive directory
198    List<Path> files = getAllFiles(fs, archiveDir);
199    if (files == null) {
200      CommonFSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG);
201      throw new RuntimeException("Didn't archive any files!");
202    }
203    CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size());
204
205    runCleaner(cleaner, finished, stop);
206
207    // know the cleaner ran, so now check all the files again to make sure they are still there
208    List<Path> archivedFiles = getAllFiles(fs, archiveDir);
209    assertEquals("Archived files changed after running archive cleaner.", files, archivedFiles);
210
211    // but we still have the archive directory
212    assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration())));
213  }
214
215  /**
216   * Test archiving/cleaning across multiple tables, where some are retained, and others aren't
217   * @throws Exception on failure
218   */
219  @Test
220  public void testMultipleTables() throws Exception {
221    createArchiveDirectory();
222    String otherTable = "otherTable";
223
224    FileSystem fs = UTIL.getTestFileSystem();
225    Path archiveDir = getArchiveDir();
226    Path tableDir = getTableDir(STRING_TABLE_NAME);
227    Path otherTableDir = getTableDir(otherTable);
228
229    // register cleanup for the created directories
230    toCleanup.add(archiveDir);
231    toCleanup.add(tableDir);
232    toCleanup.add(otherTableDir);
233    Configuration conf = UTIL.getConfiguration();
234    // setup the delegate
235    Stoppable stop = new StoppableImplementation();
236    final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
237    HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
238    List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
239    final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
240    // create the region
241    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM);
242    HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
243    List<HRegion> regions = new ArrayList<>();
244    regions.add(region);
245    doReturn(regions).when(rss).getRegions();
246    final CompactedHFilesDischarger compactionCleaner =
247      new CompactedHFilesDischarger(100, stop, rss, false);
248    loadFlushAndCompact(region, TEST_FAM);
249    compactionCleaner.chore();
250    // create the another table that we don't archive
251    hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM);
252    HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd);
253    regions = new ArrayList<>();
254    regions.add(otherRegion);
255    doReturn(regions).when(rss).getRegions();
256    final CompactedHFilesDischarger compactionCleaner1 =
257      new CompactedHFilesDischarger(100, stop, rss, false);
258    loadFlushAndCompact(otherRegion, TEST_FAM);
259    compactionCleaner1.chore();
260    // get the current hfiles in the archive directory
261    // Should be archived
262    List<Path> files = getAllFiles(fs, archiveDir);
263    if (files == null) {
264      CommonFSUtils.logFileSystemState(fs, archiveDir, LOG);
265      throw new RuntimeException("Didn't load archive any files!");
266    }
267
268    // make sure we have files from both tables
269    int initialCountForPrimary = 0;
270    int initialCountForOtherTable = 0;
271    for (Path file : files) {
272      String tableName = file.getParent().getParent().getParent().getName();
273      // check to which table this file belongs
274      if (tableName.equals(otherTable)) {
275        initialCountForOtherTable++;
276      } else if (tableName.equals(STRING_TABLE_NAME)) {
277        initialCountForPrimary++;
278      }
279    }
280
281    assertTrue("Didn't archive files for:" + STRING_TABLE_NAME, initialCountForPrimary > 0);
282    assertTrue("Didn't archive files for:" + otherTable, initialCountForOtherTable > 0);
283
284    // run the cleaners, checking for each of the directories + files (both should be deleted and
285    // need to be checked) in 'otherTable' and the files (which should be retained) in the 'table'
286    CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size() + 3);
287    // run the cleaner
288    choreService.scheduleChore(cleaner);
289    // wait for the cleaner to check all the files
290    finished.await();
291    // stop the cleaner
292    stop.stop("");
293
294    // know the cleaner ran, so now check all the files again to make sure they are still there
295    List<Path> archivedFiles = getAllFiles(fs, archiveDir);
296    int archivedForPrimary = 0;
297    for (Path file : archivedFiles) {
298      String tableName = file.getParent().getParent().getParent().getName();
299      // ensure we don't have files from the non-archived table
300      assertFalse("Have a file from the non-archived table: " + file, tableName.equals(otherTable));
301      if (tableName.equals(STRING_TABLE_NAME)) {
302        archivedForPrimary++;
303      }
304    }
305
306    assertEquals("Not all archived files for the primary table were retained.",
307      initialCountForPrimary, archivedForPrimary);
308
309    // but we still have the archive directory
310    assertTrue("Archive directory was deleted via archiver", fs.exists(archiveDir));
311  }
312
313  private void createArchiveDirectory() throws IOException {
314    // create the archive and test directory
315    FileSystem fs = UTIL.getTestFileSystem();
316    Path archiveDir = getArchiveDir();
317    fs.mkdirs(archiveDir);
318  }
319
320  private Path getArchiveDir() throws IOException {
321    return new Path(UTIL.getDataTestDir(), HConstants.HFILE_ARCHIVE_DIRECTORY);
322  }
323
324  private Path getTableDir(String tableName) throws IOException {
325    Path testDataDir = UTIL.getDataTestDir();
326    CommonFSUtils.setRootDir(UTIL.getConfiguration(), testDataDir);
327    return new Path(testDataDir, tableName);
328  }
329
330  private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Path archiveDir,
331    Stoppable stop) {
332    conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
333      LongTermArchivingHFileCleaner.class.getCanonicalName());
334    return new HFileCleaner(1000, stop, conf, fs, archiveDir, POOL);
335  }
336
337  /**
338   * Start archiving table for given hfile cleaner
339   * @param tableName table to archive
340   * @param cleaner   cleaner to check to make sure change propagated
341   * @return underlying {@link LongTermArchivingHFileCleaner} that is managing archiving
342   * @throws IOException     on failure
343   * @throws KeeperException on failure
344   */
345  @SuppressWarnings("checkstyle:EmptyBlock")
346  private List<BaseHFileCleanerDelegate> turnOnArchiving(String tableName, HFileCleaner cleaner)
347    throws IOException, KeeperException {
348    // turn on hfile retention
349    LOG.debug("----Starting archiving for table:" + tableName);
350    archivingClient.enableHFileBackupAsync(Bytes.toBytes(tableName));
351    assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(tableName));
352
353    // wait for the archiver to get the notification
354    List<BaseHFileCleanerDelegate> cleaners = cleaner.getDelegatesForTesting();
355    LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
356    while (!delegate.archiveTracker.keepHFiles(STRING_TABLE_NAME)) {
357      // spin until propagation - should be fast
358    }
359    return cleaners;
360  }
361
362  /**
363   * Spy on the {@link LongTermArchivingHFileCleaner} to ensure we can catch when the cleaner has
364   * seen all the files
365   * @return a {@link CountDownLatch} to wait on that releases when the cleaner has been called at
366   *         least the expected number of times.
367   */
368  private CountDownLatch setupCleanerWatching(LongTermArchivingHFileCleaner cleaner,
369    List<BaseHFileCleanerDelegate> cleaners, final int expected) {
370    // replace the cleaner with one that we can can check
371    BaseHFileCleanerDelegate delegateSpy = spy(cleaner);
372    final int[] counter = new int[] { 0 };
373    final CountDownLatch finished = new CountDownLatch(1);
374    doAnswer(new Answer<Iterable<FileStatus>>() {
375
376      @Override
377      public Iterable<FileStatus> answer(InvocationOnMock invocation) throws Throwable {
378        counter[0]++;
379        LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to getDeletableFiles for files: "
380          + invocation.getArgument(0));
381
382        @SuppressWarnings("unchecked")
383        Iterable<FileStatus> ret = (Iterable<FileStatus>) invocation.callRealMethod();
384        if (counter[0] >= expected) {
385          finished.countDown();
386        }
387
388        return ret;
389      }
390    }).when(delegateSpy).getDeletableFiles(anyList());
391    cleaners.set(0, delegateSpy);
392
393    return finished;
394  }
395
396  /**
397   * Get all the files (non-directory entries) in the file system under the passed directory
398   * @param dir directory to investigate
399   * @return all files under the directory
400   */
401  private List<Path> getAllFiles(FileSystem fs, Path dir) throws IOException {
402    FileStatus[] files = CommonFSUtils.listStatus(fs, dir, null);
403    if (files == null) {
404      LOG.warn("No files under:" + dir);
405      return null;
406    }
407
408    List<Path> allFiles = new ArrayList<>();
409    for (FileStatus file : files) {
410      if (file.isDirectory()) {
411        List<Path> subFiles = getAllFiles(fs, file.getPath());
412
413        if (subFiles != null) {
414          allFiles.addAll(subFiles);
415        }
416
417        continue;
418      }
419      allFiles.add(file.getPath());
420    }
421    return allFiles;
422  }
423
424  private void loadFlushAndCompact(HRegion region, byte[] family) throws IOException {
425    // create two hfiles in the region
426    createHFileInRegion(region, family);
427    createHFileInRegion(region, family);
428
429    HStore s = region.getStore(family);
430    int count = s.getStorefilesCount();
431    assertTrue("Don't have the expected store files, wanted >= 2 store files, but was:" + count,
432      count >= 2);
433
434    // compact the two files into one file to get files in the archive
435    LOG.debug("Compacting stores");
436    region.compact(true);
437  }
438
439  /**
440   * Create a new hfile in the passed region
441   * @param region       region to operate on
442   * @param columnFamily family for which to add data
443   * @throws IOException if doing the put or flush fails
444   */
445  private void createHFileInRegion(HRegion region, byte[] columnFamily) throws IOException {
446    // put one row in the region
447    Put p = new Put(Bytes.toBytes("row"));
448    p.addColumn(columnFamily, Bytes.toBytes("Qual"), Bytes.toBytes("v1"));
449    region.put(p);
450    // flush the region to make a store file
451    region.flush(true);
452  }
453
454  /**
455   * @param cleaner the cleaner to use
456   */
457  private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop)
458    throws InterruptedException {
459    final ChoreService choreService = new ChoreService("CLEANER_SERVER_NAME");
460    // run the cleaner
461    choreService.scheduleChore(cleaner);
462    // wait for the cleaner to check all the files
463    finished.await();
464    // stop the cleaner
465    stop.stop("");
466  }
467}