001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.example; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Mockito.mock; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.CountDownLatch; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.ChoreService; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.Stoppable; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.ConnectionRegistry; 044import org.apache.hadoop.hbase.client.DoNothingConnectionRegistry; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; 047import org.apache.hadoop.hbase.master.cleaner.DirScanPool; 048import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; 049import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger; 050import org.apache.hadoop.hbase.regionserver.HRegion; 051import org.apache.hadoop.hbase.regionserver.HStore; 052import org.apache.hadoop.hbase.regionserver.RegionServerServices; 053import org.apache.hadoop.hbase.security.User; 054import org.apache.hadoop.hbase.testclassification.MediumTests; 055import org.apache.hadoop.hbase.testclassification.MiscTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.util.CommonFSUtils; 058import org.apache.hadoop.hbase.util.HFileArchiveUtil; 059import org.apache.hadoop.hbase.util.StoppableImplementation; 060import org.apache.hadoop.hbase.zookeeper.ZKUtil; 061import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 062import org.apache.zookeeper.KeeperException; 063import org.junit.After; 064import org.junit.AfterClass; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.mockito.Mockito; 070import org.mockito.invocation.InvocationOnMock; 071import org.mockito.stubbing.Answer; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075/** 076 * Spin up a small cluster and check that the hfiles of region are properly long-term archived as 077 * specified via the {@link ZKTableArchiveClient}. 078 */ 079@Category({ MiscTests.class, MediumTests.class }) 080public class TestZooKeeperTableArchiveClient { 081 082 @ClassRule 083 public static final HBaseClassTestRule CLASS_RULE = 084 HBaseClassTestRule.forClass(TestZooKeeperTableArchiveClient.class); 085 086 private static final Logger LOG = LoggerFactory.getLogger(TestZooKeeperTableArchiveClient.class); 087 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 088 private static final String STRING_TABLE_NAME = "test"; 089 private static final byte[] TEST_FAM = Bytes.toBytes("fam"); 090 private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME); 091 private static ZKTableArchiveClient archivingClient; 092 private final List<Path> toCleanup = new ArrayList<>(); 093 private static Connection CONNECTION; 094 private static RegionServerServices rss; 095 private static DirScanPool POOL; 096 097 public static final class MockRegistry extends DoNothingConnectionRegistry { 098 099 public MockRegistry(Configuration conf, User user) { 100 super(conf, user); 101 } 102 103 @Override 104 public CompletableFuture<String> getClusterId() { 105 return CompletableFuture.completedFuture("clusterId"); 106 } 107 } 108 109 /** 110 * Setup the config for the cluster 111 */ 112 @BeforeClass 113 public static void setupCluster() throws Exception { 114 setupConf(UTIL.getConfiguration()); 115 UTIL.startMiniZKCluster(); 116 UTIL.getConfiguration().setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 117 MockRegistry.class, ConnectionRegistry.class); 118 CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration()); 119 archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), CONNECTION); 120 // make hfile archiving node so we can archive files 121 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 122 String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher); 123 ZKUtil.createWithParents(watcher, archivingZNode); 124 rss = mock(RegionServerServices.class); 125 POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration()); 126 } 127 128 private static void setupConf(Configuration conf) { 129 // only compact with 3 files 130 conf.setInt("hbase.hstore.compaction.min", 3); 131 } 132 133 @After 134 public void tearDown() throws Exception { 135 try { 136 FileSystem fs = UTIL.getTestFileSystem(); 137 // cleanup each of the files/directories registered 138 for (Path file : toCleanup) { 139 // remove the table and archive directories 140 CommonFSUtils.delete(fs, file, true); 141 } 142 } catch (IOException e) { 143 LOG.warn("Failure to delete archive directory", e); 144 } finally { 145 toCleanup.clear(); 146 } 147 // make sure that backups are off for all tables 148 archivingClient.disableHFileBackup(); 149 } 150 151 @AfterClass 152 public static void cleanupTest() throws Exception { 153 if (CONNECTION != null) { 154 CONNECTION.close(); 155 } 156 UTIL.shutdownMiniZKCluster(); 157 if (POOL != null) { 158 POOL.shutdownNow(); 159 } 160 } 161 162 /** 163 * Test turning on/off archiving 164 */ 165 @Test 166 public void testArchivingEnableDisable() throws Exception { 167 // 1. turn on hfile backups 168 LOG.debug("----Starting archiving"); 169 archivingClient.enableHFileBackupAsync(TABLE_NAME); 170 assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(TABLE_NAME)); 171 172 // 2. Turn off archiving and make sure its off 173 archivingClient.disableHFileBackup(); 174 assertFalse("Archving didn't get turned off.", archivingClient.getArchivingEnabled(TABLE_NAME)); 175 176 // 3. Check enable/disable on a single table 177 archivingClient.enableHFileBackupAsync(TABLE_NAME); 178 assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(TABLE_NAME)); 179 180 // 4. Turn off archiving and make sure its off 181 archivingClient.disableHFileBackup(TABLE_NAME); 182 assertFalse("Archving didn't get turned off for " + STRING_TABLE_NAME, 183 archivingClient.getArchivingEnabled(TABLE_NAME)); 184 } 185 186 @Test 187 public void testArchivingOnSingleTable() throws Exception { 188 createArchiveDirectory(); 189 FileSystem fs = UTIL.getTestFileSystem(); 190 Path archiveDir = getArchiveDir(); 191 Path tableDir = getTableDir(STRING_TABLE_NAME); 192 toCleanup.add(archiveDir); 193 toCleanup.add(tableDir); 194 195 Configuration conf = UTIL.getConfiguration(); 196 // setup the delegate 197 Stoppable stop = new StoppableImplementation(); 198 HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); 199 List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); 200 final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); 201 202 // create the region 203 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM); 204 HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); 205 List<HRegion> regions = new ArrayList<>(); 206 regions.add(region); 207 Mockito.doReturn(regions).when(rss).getRegions(); 208 final CompactedHFilesDischarger compactionCleaner = 209 new CompactedHFilesDischarger(100, stop, rss, false); 210 loadFlushAndCompact(region, TEST_FAM); 211 compactionCleaner.chore(); 212 // get the current hfiles in the archive directory 213 List<Path> files = getAllFiles(fs, archiveDir); 214 if (files == null) { 215 CommonFSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG); 216 throw new RuntimeException("Didn't archive any files!"); 217 } 218 CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size()); 219 220 runCleaner(cleaner, finished, stop); 221 222 // know the cleaner ran, so now check all the files again to make sure they are still there 223 List<Path> archivedFiles = getAllFiles(fs, archiveDir); 224 assertEquals("Archived files changed after running archive cleaner.", files, archivedFiles); 225 226 // but we still have the archive directory 227 assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()))); 228 } 229 230 /** 231 * Test archiving/cleaning across multiple tables, where some are retained, and others aren't 232 * @throws Exception on failure 233 */ 234 @Test 235 public void testMultipleTables() throws Exception { 236 createArchiveDirectory(); 237 String otherTable = "otherTable"; 238 239 FileSystem fs = UTIL.getTestFileSystem(); 240 Path archiveDir = getArchiveDir(); 241 Path tableDir = getTableDir(STRING_TABLE_NAME); 242 Path otherTableDir = getTableDir(otherTable); 243 244 // register cleanup for the created directories 245 toCleanup.add(archiveDir); 246 toCleanup.add(tableDir); 247 toCleanup.add(otherTableDir); 248 Configuration conf = UTIL.getConfiguration(); 249 // setup the delegate 250 Stoppable stop = new StoppableImplementation(); 251 final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); 252 HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); 253 List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); 254 final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); 255 // create the region 256 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM); 257 HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); 258 List<HRegion> regions = new ArrayList<>(); 259 regions.add(region); 260 Mockito.doReturn(regions).when(rss).getRegions(); 261 final CompactedHFilesDischarger compactionCleaner = 262 new CompactedHFilesDischarger(100, stop, rss, false); 263 loadFlushAndCompact(region, TEST_FAM); 264 compactionCleaner.chore(); 265 // create the another table that we don't archive 266 hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM); 267 HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd); 268 regions = new ArrayList<>(); 269 regions.add(otherRegion); 270 Mockito.doReturn(regions).when(rss).getRegions(); 271 final CompactedHFilesDischarger compactionCleaner1 = 272 new CompactedHFilesDischarger(100, stop, rss, false); 273 loadFlushAndCompact(otherRegion, TEST_FAM); 274 compactionCleaner1.chore(); 275 // get the current hfiles in the archive directory 276 // Should be archived 277 List<Path> files = getAllFiles(fs, archiveDir); 278 if (files == null) { 279 CommonFSUtils.logFileSystemState(fs, archiveDir, LOG); 280 throw new RuntimeException("Didn't load archive any files!"); 281 } 282 283 // make sure we have files from both tables 284 int initialCountForPrimary = 0; 285 int initialCountForOtherTable = 0; 286 for (Path file : files) { 287 String tableName = file.getParent().getParent().getParent().getName(); 288 // check to which table this file belongs 289 if (tableName.equals(otherTable)) { 290 initialCountForOtherTable++; 291 } else if (tableName.equals(STRING_TABLE_NAME)) { 292 initialCountForPrimary++; 293 } 294 } 295 296 assertTrue("Didn't archive files for:" + STRING_TABLE_NAME, initialCountForPrimary > 0); 297 assertTrue("Didn't archive files for:" + otherTable, initialCountForOtherTable > 0); 298 299 // run the cleaners, checking for each of the directories + files (both should be deleted and 300 // need to be checked) in 'otherTable' and the files (which should be retained) in the 'table' 301 CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size() + 3); 302 // run the cleaner 303 choreService.scheduleChore(cleaner); 304 // wait for the cleaner to check all the files 305 finished.await(); 306 // stop the cleaner 307 stop.stop(""); 308 309 // know the cleaner ran, so now check all the files again to make sure they are still there 310 List<Path> archivedFiles = getAllFiles(fs, archiveDir); 311 int archivedForPrimary = 0; 312 for (Path file : archivedFiles) { 313 String tableName = file.getParent().getParent().getParent().getName(); 314 // ensure we don't have files from the non-archived table 315 assertFalse("Have a file from the non-archived table: " + file, tableName.equals(otherTable)); 316 if (tableName.equals(STRING_TABLE_NAME)) { 317 archivedForPrimary++; 318 } 319 } 320 321 assertEquals("Not all archived files for the primary table were retained.", 322 initialCountForPrimary, archivedForPrimary); 323 324 // but we still have the archive directory 325 assertTrue("Archive directory was deleted via archiver", fs.exists(archiveDir)); 326 } 327 328 private void createArchiveDirectory() throws IOException { 329 // create the archive and test directory 330 FileSystem fs = UTIL.getTestFileSystem(); 331 Path archiveDir = getArchiveDir(); 332 fs.mkdirs(archiveDir); 333 } 334 335 private Path getArchiveDir() throws IOException { 336 return new Path(UTIL.getDataTestDir(), HConstants.HFILE_ARCHIVE_DIRECTORY); 337 } 338 339 private Path getTableDir(String tableName) throws IOException { 340 Path testDataDir = UTIL.getDataTestDir(); 341 CommonFSUtils.setRootDir(UTIL.getConfiguration(), testDataDir); 342 return new Path(testDataDir, tableName); 343 } 344 345 private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Path archiveDir, 346 Stoppable stop) { 347 conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, 348 LongTermArchivingHFileCleaner.class.getCanonicalName()); 349 return new HFileCleaner(1000, stop, conf, fs, archiveDir, POOL); 350 } 351 352 /** 353 * Start archiving table for given hfile cleaner 354 * @param tableName table to archive 355 * @param cleaner cleaner to check to make sure change propagated 356 * @return underlying {@link LongTermArchivingHFileCleaner} that is managing archiving 357 * @throws IOException on failure 358 * @throws KeeperException on failure 359 */ 360 @SuppressWarnings("checkstyle:EmptyBlock") 361 private List<BaseHFileCleanerDelegate> turnOnArchiving(String tableName, HFileCleaner cleaner) 362 throws IOException, KeeperException { 363 // turn on hfile retention 364 LOG.debug("----Starting archiving for table:" + tableName); 365 archivingClient.enableHFileBackupAsync(Bytes.toBytes(tableName)); 366 assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(tableName)); 367 368 // wait for the archiver to get the notification 369 List<BaseHFileCleanerDelegate> cleaners = cleaner.getDelegatesForTesting(); 370 LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); 371 while (!delegate.archiveTracker.keepHFiles(STRING_TABLE_NAME)) { 372 // spin until propagation - should be fast 373 } 374 return cleaners; 375 } 376 377 /** 378 * Spy on the {@link LongTermArchivingHFileCleaner} to ensure we can catch when the cleaner has 379 * seen all the files 380 * @return a {@link CountDownLatch} to wait on that releases when the cleaner has been called at 381 * least the expected number of times. 382 */ 383 private CountDownLatch setupCleanerWatching(LongTermArchivingHFileCleaner cleaner, 384 List<BaseHFileCleanerDelegate> cleaners, final int expected) { 385 // replace the cleaner with one that we can can check 386 BaseHFileCleanerDelegate delegateSpy = Mockito.spy(cleaner); 387 final int[] counter = new int[] { 0 }; 388 final CountDownLatch finished = new CountDownLatch(1); 389 Mockito.doAnswer(new Answer<Iterable<FileStatus>>() { 390 391 @Override 392 public Iterable<FileStatus> answer(InvocationOnMock invocation) throws Throwable { 393 counter[0]++; 394 LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to getDeletableFiles for files: " 395 + invocation.getArgument(0)); 396 397 @SuppressWarnings("unchecked") 398 Iterable<FileStatus> ret = (Iterable<FileStatus>) invocation.callRealMethod(); 399 if (counter[0] >= expected) { 400 finished.countDown(); 401 } 402 403 return ret; 404 } 405 }).when(delegateSpy).getDeletableFiles(Mockito.anyList()); 406 cleaners.set(0, delegateSpy); 407 408 return finished; 409 } 410 411 /** 412 * Get all the files (non-directory entries) in the file system under the passed directory 413 * @param dir directory to investigate 414 * @return all files under the directory 415 */ 416 private List<Path> getAllFiles(FileSystem fs, Path dir) throws IOException { 417 FileStatus[] files = CommonFSUtils.listStatus(fs, dir, null); 418 if (files == null) { 419 LOG.warn("No files under:" + dir); 420 return null; 421 } 422 423 List<Path> allFiles = new ArrayList<>(); 424 for (FileStatus file : files) { 425 if (file.isDirectory()) { 426 List<Path> subFiles = getAllFiles(fs, file.getPath()); 427 428 if (subFiles != null) { 429 allFiles.addAll(subFiles); 430 } 431 432 continue; 433 } 434 allFiles.add(file.getPath()); 435 } 436 return allFiles; 437 } 438 439 private void loadFlushAndCompact(HRegion region, byte[] family) throws IOException { 440 // create two hfiles in the region 441 createHFileInRegion(region, family); 442 createHFileInRegion(region, family); 443 444 HStore s = region.getStore(family); 445 int count = s.getStorefilesCount(); 446 assertTrue("Don't have the expected store files, wanted >= 2 store files, but was:" + count, 447 count >= 2); 448 449 // compact the two files into one file to get files in the archive 450 LOG.debug("Compacting stores"); 451 region.compact(true); 452 } 453 454 /** 455 * Create a new hfile in the passed region 456 * @param region region to operate on 457 * @param columnFamily family for which to add data 458 * @throws IOException if doing the put or flush fails 459 */ 460 private void createHFileInRegion(HRegion region, byte[] columnFamily) throws IOException { 461 // put one row in the region 462 Put p = new Put(Bytes.toBytes("row")); 463 p.addColumn(columnFamily, Bytes.toBytes("Qual"), Bytes.toBytes("v1")); 464 region.put(p); 465 // flush the region to make a store file 466 region.flush(true); 467 } 468 469 /** 470 * @param cleaner the cleaner to use 471 */ 472 private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop) 473 throws InterruptedException { 474 final ChoreService choreService = new ChoreService("CLEANER_SERVER_NAME"); 475 // run the cleaner 476 choreService.scheduleChore(cleaner); 477 // wait for the cleaner to check all the files 478 finished.await(); 479 // stop the cleaner 480 stop.stop(""); 481 } 482}