001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.example; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.ArgumentMatchers.anyList; 024import static org.mockito.Mockito.doAnswer; 025import static org.mockito.Mockito.doReturn; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.spy; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.List; 032import java.util.concurrent.CountDownLatch; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.ChoreService; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.Stoppable; 042import org.apache.hadoop.hbase.client.ClusterConnection; 043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 045import org.apache.hadoop.hbase.client.ConnectionFactory; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; 048import org.apache.hadoop.hbase.master.cleaner.DirScanPool; 049import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; 050import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger; 051import org.apache.hadoop.hbase.regionserver.HRegion; 052import org.apache.hadoop.hbase.regionserver.HStore; 053import org.apache.hadoop.hbase.regionserver.RegionServerServices; 054import org.apache.hadoop.hbase.testclassification.MediumTests; 055import org.apache.hadoop.hbase.testclassification.MiscTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.util.CommonFSUtils; 058import org.apache.hadoop.hbase.util.HFileArchiveUtil; 059import org.apache.hadoop.hbase.util.StoppableImplementation; 060import org.apache.hadoop.hbase.zookeeper.ZKUtil; 061import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 062import org.apache.zookeeper.KeeperException; 063import org.junit.After; 064import org.junit.AfterClass; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.mockito.invocation.InvocationOnMock; 070import org.mockito.stubbing.Answer; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074/** 075 * Spin up a small cluster and check that the hfiles of region are properly long-term archived as 076 * specified via the {@link ZKTableArchiveClient}. 077 */ 078@Category({ MiscTests.class, MediumTests.class }) 079public class TestZooKeeperTableArchiveClient { 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestZooKeeperTableArchiveClient.class); 084 085 private static final Logger LOG = LoggerFactory.getLogger(TestZooKeeperTableArchiveClient.class); 086 private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU(); 087 private static final String STRING_TABLE_NAME = "test"; 088 private static final byte[] TEST_FAM = Bytes.toBytes("fam"); 089 private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME); 090 private static ZKTableArchiveClient archivingClient; 091 private final List<Path> toCleanup = new ArrayList<>(); 092 private static ClusterConnection CONNECTION; 093 private static RegionServerServices rss; 094 private static DirScanPool POOL; 095 096 /** 097 * Setup the config for the cluster 098 */ 099 @BeforeClass 100 public static void setupCluster() throws Exception { 101 setupConf(UTIL.getConfiguration()); 102 UTIL.startMiniZKCluster(); 103 CONNECTION = (ClusterConnection) ConnectionFactory.createConnection(UTIL.getConfiguration()); 104 archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), CONNECTION); 105 // make hfile archiving node so we can archive files 106 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 107 String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher); 108 ZKUtil.createWithParents(watcher, archivingZNode); 109 rss = mock(RegionServerServices.class); 110 POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration()); 111 } 112 113 private static void setupConf(Configuration conf) { 114 // only compact with 3 files 115 conf.setInt("hbase.hstore.compaction.min", 3); 116 } 117 118 @After 119 public void tearDown() throws Exception { 120 try { 121 FileSystem fs = UTIL.getTestFileSystem(); 122 // cleanup each of the files/directories registered 123 for (Path file : toCleanup) { 124 // remove the table and archive directories 125 CommonFSUtils.delete(fs, file, true); 126 } 127 } catch (IOException e) { 128 LOG.warn("Failure to delete archive directory", e); 129 } finally { 130 toCleanup.clear(); 131 } 132 // make sure that backups are off for all tables 133 archivingClient.disableHFileBackup(); 134 } 135 136 @AfterClass 137 public static void cleanupTest() throws Exception { 138 if (CONNECTION != null) { 139 CONNECTION.close(); 140 } 141 UTIL.shutdownMiniZKCluster(); 142 if (POOL != null) { 143 POOL.shutdownNow(); 144 } 145 } 146 147 /** 148 * Test turning on/off archiving 149 */ 150 @Test 151 public void testArchivingEnableDisable() throws Exception { 152 // 1. turn on hfile backups 153 LOG.debug("----Starting archiving"); 154 archivingClient.enableHFileBackupAsync(TABLE_NAME); 155 assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(TABLE_NAME)); 156 157 // 2. Turn off archiving and make sure its off 158 archivingClient.disableHFileBackup(); 159 assertFalse("Archving didn't get turned off.", archivingClient.getArchivingEnabled(TABLE_NAME)); 160 161 // 3. Check enable/disable on a single table 162 archivingClient.enableHFileBackupAsync(TABLE_NAME); 163 assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(TABLE_NAME)); 164 165 // 4. Turn off archiving and make sure its off 166 archivingClient.disableHFileBackup(TABLE_NAME); 167 assertFalse("Archving didn't get turned off for " + STRING_TABLE_NAME, 168 archivingClient.getArchivingEnabled(TABLE_NAME)); 169 } 170 171 @Test 172 public void testArchivingOnSingleTable() throws Exception { 173 createArchiveDirectory(); 174 FileSystem fs = UTIL.getTestFileSystem(); 175 Path archiveDir = getArchiveDir(); 176 Path tableDir = getTableDir(STRING_TABLE_NAME); 177 toCleanup.add(archiveDir); 178 toCleanup.add(tableDir); 179 180 Configuration conf = UTIL.getConfiguration(); 181 // setup the delegate 182 Stoppable stop = new StoppableImplementation(); 183 HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); 184 List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); 185 final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); 186 187 // create the region 188 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM); 189 HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); 190 List<HRegion> regions = new ArrayList<>(); 191 regions.add(region); 192 doReturn(regions).when(rss).getRegions(); 193 final CompactedHFilesDischarger compactionCleaner = 194 new CompactedHFilesDischarger(100, stop, rss, false); 195 loadFlushAndCompact(region, TEST_FAM); 196 compactionCleaner.chore(); 197 // get the current hfiles in the archive directory 198 List<Path> files = getAllFiles(fs, archiveDir); 199 if (files == null) { 200 CommonFSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG); 201 throw new RuntimeException("Didn't archive any files!"); 202 } 203 CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size()); 204 205 runCleaner(cleaner, finished, stop); 206 207 // know the cleaner ran, so now check all the files again to make sure they are still there 208 List<Path> archivedFiles = getAllFiles(fs, archiveDir); 209 assertEquals("Archived files changed after running archive cleaner.", files, archivedFiles); 210 211 // but we still have the archive directory 212 assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()))); 213 } 214 215 /** 216 * Test archiving/cleaning across multiple tables, where some are retained, and others aren't 217 * @throws Exception on failure 218 */ 219 @Test 220 public void testMultipleTables() throws Exception { 221 createArchiveDirectory(); 222 String otherTable = "otherTable"; 223 224 FileSystem fs = UTIL.getTestFileSystem(); 225 Path archiveDir = getArchiveDir(); 226 Path tableDir = getTableDir(STRING_TABLE_NAME); 227 Path otherTableDir = getTableDir(otherTable); 228 229 // register cleanup for the created directories 230 toCleanup.add(archiveDir); 231 toCleanup.add(tableDir); 232 toCleanup.add(otherTableDir); 233 Configuration conf = UTIL.getConfiguration(); 234 // setup the delegate 235 Stoppable stop = new StoppableImplementation(); 236 final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); 237 HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); 238 List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); 239 final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); 240 // create the region 241 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM); 242 HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); 243 List<HRegion> regions = new ArrayList<>(); 244 regions.add(region); 245 doReturn(regions).when(rss).getRegions(); 246 final CompactedHFilesDischarger compactionCleaner = 247 new CompactedHFilesDischarger(100, stop, rss, false); 248 loadFlushAndCompact(region, TEST_FAM); 249 compactionCleaner.chore(); 250 // create the another table that we don't archive 251 hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM); 252 HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd); 253 regions = new ArrayList<>(); 254 regions.add(otherRegion); 255 doReturn(regions).when(rss).getRegions(); 256 final CompactedHFilesDischarger compactionCleaner1 = 257 new CompactedHFilesDischarger(100, stop, rss, false); 258 loadFlushAndCompact(otherRegion, TEST_FAM); 259 compactionCleaner1.chore(); 260 // get the current hfiles in the archive directory 261 // Should be archived 262 List<Path> files = getAllFiles(fs, archiveDir); 263 if (files == null) { 264 CommonFSUtils.logFileSystemState(fs, archiveDir, LOG); 265 throw new RuntimeException("Didn't load archive any files!"); 266 } 267 268 // make sure we have files from both tables 269 int initialCountForPrimary = 0; 270 int initialCountForOtherTable = 0; 271 for (Path file : files) { 272 String tableName = file.getParent().getParent().getParent().getName(); 273 // check to which table this file belongs 274 if (tableName.equals(otherTable)) { 275 initialCountForOtherTable++; 276 } else if (tableName.equals(STRING_TABLE_NAME)) { 277 initialCountForPrimary++; 278 } 279 } 280 281 assertTrue("Didn't archive files for:" + STRING_TABLE_NAME, initialCountForPrimary > 0); 282 assertTrue("Didn't archive files for:" + otherTable, initialCountForOtherTable > 0); 283 284 // run the cleaners, checking for each of the directories + files (both should be deleted and 285 // need to be checked) in 'otherTable' and the files (which should be retained) in the 'table' 286 CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size() + 3); 287 // run the cleaner 288 choreService.scheduleChore(cleaner); 289 // wait for the cleaner to check all the files 290 finished.await(); 291 // stop the cleaner 292 stop.stop(""); 293 294 // know the cleaner ran, so now check all the files again to make sure they are still there 295 List<Path> archivedFiles = getAllFiles(fs, archiveDir); 296 int archivedForPrimary = 0; 297 for (Path file : archivedFiles) { 298 String tableName = file.getParent().getParent().getParent().getName(); 299 // ensure we don't have files from the non-archived table 300 assertFalse("Have a file from the non-archived table: " + file, tableName.equals(otherTable)); 301 if (tableName.equals(STRING_TABLE_NAME)) { 302 archivedForPrimary++; 303 } 304 } 305 306 assertEquals("Not all archived files for the primary table were retained.", 307 initialCountForPrimary, archivedForPrimary); 308 309 // but we still have the archive directory 310 assertTrue("Archive directory was deleted via archiver", fs.exists(archiveDir)); 311 } 312 313 private void createArchiveDirectory() throws IOException { 314 // create the archive and test directory 315 FileSystem fs = UTIL.getTestFileSystem(); 316 Path archiveDir = getArchiveDir(); 317 fs.mkdirs(archiveDir); 318 } 319 320 private Path getArchiveDir() throws IOException { 321 return new Path(UTIL.getDataTestDir(), HConstants.HFILE_ARCHIVE_DIRECTORY); 322 } 323 324 private Path getTableDir(String tableName) throws IOException { 325 Path testDataDir = UTIL.getDataTestDir(); 326 CommonFSUtils.setRootDir(UTIL.getConfiguration(), testDataDir); 327 return new Path(testDataDir, tableName); 328 } 329 330 private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Path archiveDir, 331 Stoppable stop) { 332 conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, 333 LongTermArchivingHFileCleaner.class.getCanonicalName()); 334 return new HFileCleaner(1000, stop, conf, fs, archiveDir, POOL); 335 } 336 337 /** 338 * Start archiving table for given hfile cleaner 339 * @param tableName table to archive 340 * @param cleaner cleaner to check to make sure change propagated 341 * @return underlying {@link LongTermArchivingHFileCleaner} that is managing archiving 342 * @throws IOException on failure 343 * @throws KeeperException on failure 344 */ 345 @SuppressWarnings("checkstyle:EmptyBlock") 346 private List<BaseHFileCleanerDelegate> turnOnArchiving(String tableName, HFileCleaner cleaner) 347 throws IOException, KeeperException { 348 // turn on hfile retention 349 LOG.debug("----Starting archiving for table:" + tableName); 350 archivingClient.enableHFileBackupAsync(Bytes.toBytes(tableName)); 351 assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(tableName)); 352 353 // wait for the archiver to get the notification 354 List<BaseHFileCleanerDelegate> cleaners = cleaner.getDelegatesForTesting(); 355 LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); 356 while (!delegate.archiveTracker.keepHFiles(STRING_TABLE_NAME)) { 357 // spin until propagation - should be fast 358 } 359 return cleaners; 360 } 361 362 /** 363 * Spy on the {@link LongTermArchivingHFileCleaner} to ensure we can catch when the cleaner has 364 * seen all the files 365 * @return a {@link CountDownLatch} to wait on that releases when the cleaner has been called at 366 * least the expected number of times. 367 */ 368 private CountDownLatch setupCleanerWatching(LongTermArchivingHFileCleaner cleaner, 369 List<BaseHFileCleanerDelegate> cleaners, final int expected) { 370 // replace the cleaner with one that we can can check 371 BaseHFileCleanerDelegate delegateSpy = spy(cleaner); 372 final int[] counter = new int[] { 0 }; 373 final CountDownLatch finished = new CountDownLatch(1); 374 doAnswer(new Answer<Iterable<FileStatus>>() { 375 376 @Override 377 public Iterable<FileStatus> answer(InvocationOnMock invocation) throws Throwable { 378 counter[0]++; 379 LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to getDeletableFiles for files: " 380 + invocation.getArgument(0)); 381 382 @SuppressWarnings("unchecked") 383 Iterable<FileStatus> ret = (Iterable<FileStatus>) invocation.callRealMethod(); 384 if (counter[0] >= expected) { 385 finished.countDown(); 386 } 387 388 return ret; 389 } 390 }).when(delegateSpy).getDeletableFiles(anyList()); 391 cleaners.set(0, delegateSpy); 392 393 return finished; 394 } 395 396 /** 397 * Get all the files (non-directory entries) in the file system under the passed directory 398 * @param dir directory to investigate 399 * @return all files under the directory 400 */ 401 private List<Path> getAllFiles(FileSystem fs, Path dir) throws IOException { 402 FileStatus[] files = CommonFSUtils.listStatus(fs, dir, null); 403 if (files == null) { 404 LOG.warn("No files under:" + dir); 405 return null; 406 } 407 408 List<Path> allFiles = new ArrayList<>(); 409 for (FileStatus file : files) { 410 if (file.isDirectory()) { 411 List<Path> subFiles = getAllFiles(fs, file.getPath()); 412 413 if (subFiles != null) { 414 allFiles.addAll(subFiles); 415 } 416 417 continue; 418 } 419 allFiles.add(file.getPath()); 420 } 421 return allFiles; 422 } 423 424 private void loadFlushAndCompact(HRegion region, byte[] family) throws IOException { 425 // create two hfiles in the region 426 createHFileInRegion(region, family); 427 createHFileInRegion(region, family); 428 429 HStore s = region.getStore(family); 430 int count = s.getStorefilesCount(); 431 assertTrue("Don't have the expected store files, wanted >= 2 store files, but was:" + count, 432 count >= 2); 433 434 // compact the two files into one file to get files in the archive 435 LOG.debug("Compacting stores"); 436 region.compact(true); 437 } 438 439 /** 440 * Create a new hfile in the passed region 441 * @param region region to operate on 442 * @param columnFamily family for which to add data 443 * @throws IOException if doing the put or flush fails 444 */ 445 private void createHFileInRegion(HRegion region, byte[] columnFamily) throws IOException { 446 // put one row in the region 447 Put p = new Put(Bytes.toBytes("row")); 448 p.addColumn(columnFamily, Bytes.toBytes("Qual"), Bytes.toBytes("v1")); 449 region.put(p); 450 // flush the region to make a store file 451 region.flush(true); 452 } 453 454 /** 455 * @param cleaner the cleaner to use 456 */ 457 private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop) 458 throws InterruptedException { 459 final ChoreService choreService = new ChoreService("CLEANER_SERVER_NAME"); 460 // run the cleaner 461 choreService.scheduleChore(cleaner); 462 // wait for the cleaner to check all the files 463 finished.await(); 464 // stop the cleaner 465 stop.stop(""); 466 } 467}