001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.Collection; 027import java.util.List; 028import java.util.concurrent.CompletableFuture; 029import java.util.stream.Collectors; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.CompactionState; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.RegionInfo; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.ResultScanner; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.client.TableDescriptor; 048import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 049import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 050import org.apache.hadoop.hbase.regionserver.HRegion; 051import org.apache.hadoop.hbase.regionserver.HStore; 052import org.apache.hadoop.hbase.regionserver.HStoreFile; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.junit.After; 056import org.junit.Before; 057import org.junit.ClassRule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063/** 064 * Mob file cleaner chore test. 1. Creates MOB table 2. Load MOB data and flushes it N times 3. Runs 065 * major MOB compaction 4. Verifies that number of MOB files in a mob directory is N+1 5. Waits for 066 * a period of time larger than minimum age to archive 6. Runs Mob cleaner chore 7 Verifies that 067 * every old MOB file referenced from current RS was archived 068 */ 069@Category(MediumTests.class) 070public class TestRSMobFileCleanerChore { 071 private static final Logger LOG = LoggerFactory.getLogger(TestRSMobFileCleanerChore.class); 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestRSMobFileCleanerChore.class); 075 076 private HBaseTestingUtil HTU; 077 078 private final static String famStr = "f1"; 079 private final static byte[] fam = Bytes.toBytes(famStr); 080 private final static byte[] qualifier = Bytes.toBytes("q1"); 081 private final static long mobLen = 10; 082 private final static byte[] mobVal = Bytes 083 .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789"); 084 085 private Configuration conf; 086 private TableDescriptor tableDescriptor; 087 private ColumnFamilyDescriptor familyDescriptor; 088 private Admin admin; 089 private Table table = null; 090 private RSMobFileCleanerChore chore; 091 private long minAgeToArchive = 10000; 092 093 public TestRSMobFileCleanerChore() { 094 } 095 096 @Before 097 public void setUp() throws Exception { 098 HTU = new HBaseTestingUtil(); 099 conf = HTU.getConfiguration(); 100 101 initConf(); 102 103 HTU.startMiniCluster(); 104 admin = HTU.getAdmin(); 105 familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true) 106 .setMobThreshold(mobLen).setMaxVersions(1).build(); 107 tableDescriptor = HTU.createModifyableTableDescriptor("testMobCompactTable") 108 .setColumnFamily(familyDescriptor).build(); 109 table = HTU.createTable(tableDescriptor, Bytes.toByteArrays("1")); 110 } 111 112 private void initConf() { 113 114 conf.setInt("hfile.format.version", 3); 115 conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); 116 conf.setInt("hbase.client.retries.number", 100); 117 conf.setInt("hbase.hregion.max.filesize", 200000000); 118 conf.setInt("hbase.hregion.memstore.flush.size", 800000); 119 conf.setInt("hbase.hstore.blockingStoreFiles", 150); 120 conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800); 121 conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800); 122 // conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY, 123 // FaultyMobStoreCompactor.class.getName()); 124 // Disable automatic MOB compaction 125 conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0); 126 // Disable automatic MOB file cleaner chore 127 conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0); 128 // Set minimum age to archive to 10 sec 129 conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive); 130 // Set compacted file discharger interval to a half minAgeToArchive 131 conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2); 132 } 133 134 private void loadData(Table t, int start, int num) { 135 try { 136 137 for (int i = 0; i < num; i++) { 138 Put p = new Put(Bytes.toBytes(start + i)); 139 p.addColumn(fam, qualifier, mobVal); 140 t.put(p); 141 } 142 admin.flush(t.getName()); 143 } catch (Exception e) { 144 LOG.error("MOB file cleaner chore test FAILED", e); 145 assertTrue(false); 146 } 147 } 148 149 @After 150 public void tearDown() throws Exception { 151 admin.disableTable(tableDescriptor.getTableName()); 152 admin.deleteTable(tableDescriptor.getTableName()); 153 HTU.shutdownMiniCluster(); 154 } 155 156 @Test 157 public void testMobFileCleanerChore() throws InterruptedException, IOException { 158 loadData(table, 0, 10); 159 loadData(table, 10, 10); 160 // loadData(20, 10); 161 long num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); 162 assertEquals(2, num); 163 // Major compact 164 admin.majorCompact(tableDescriptor.getTableName(), fam); 165 // wait until compaction is complete 166 while (admin.getCompactionState(tableDescriptor.getTableName()) != CompactionState.NONE) { 167 Thread.sleep(100); 168 } 169 170 num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); 171 assertEquals(3, num); 172 // We have guarantee, that compcated file discharger will run during this pause 173 // because it has interval less than this wait time 174 LOG.info("Waiting for {}ms", minAgeToArchive + 1000); 175 176 Thread.sleep(minAgeToArchive + 1000); 177 LOG.info("Cleaning up MOB files"); 178 179 ServerName serverUsed = null; 180 List<RegionInfo> serverRegions = null; 181 for (ServerName sn : admin.getRegionServers()) { 182 serverRegions = admin.getRegions(sn); 183 if (serverRegions != null && serverRegions.size() > 0) { 184 // filtering out non test table regions 185 serverRegions = serverRegions.stream().filter(r -> r.getTable() == table.getName()) 186 .collect(Collectors.toList()); 187 // if such one is found use this rs 188 if (serverRegions.size() > 0) { 189 serverUsed = sn; 190 } 191 break; 192 } 193 } 194 195 chore = HTU.getMiniHBaseCluster().getRegionServer(serverUsed).getRSMobFileCleanerChore(); 196 197 chore.chore(); 198 199 num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); 200 assertEquals(3 - serverRegions.size(), num); 201 202 long scanned = scanTable(); 203 assertEquals(20, scanned); 204 205 // creating a MOB file not referenced from the current RS 206 Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, table.getName(), 207 familyDescriptor, "nonExistentRegion"); 208 209 // verifying the new MOBfile is added 210 num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); 211 assertEquals(4 - serverRegions.size(), num); 212 213 FileSystem fs = FileSystem.get(conf); 214 assertTrue(fs.exists(extraMOBFile)); 215 216 LOG.info("Waiting for {}ms", minAgeToArchive + 1000); 217 218 Thread.sleep(minAgeToArchive + 1000); 219 LOG.info("Cleaning up MOB files"); 220 221 // running chore again 222 chore.chore(); 223 224 // the chore should only archive old MOB files that were referenced from the current RS 225 // the unrelated MOB file is still there 226 num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); 227 assertEquals(4 - serverRegions.size(), num); 228 229 assertTrue(fs.exists(extraMOBFile)); 230 231 scanned = scanTable(); 232 assertEquals(20, scanned); 233 } 234 235 @Test 236 public void testCleaningAndStoreFileReaderCreatedByOtherThreads() 237 throws IOException, InterruptedException { 238 TableName testTable = TableName.valueOf("testCleaningAndStoreFileReaderCreatedByOtherThreads"); 239 ColumnFamilyDescriptor cfDesc = ColumnFamilyDescriptorBuilder.newBuilder(fam) 240 .setMobEnabled(true).setMobThreshold(mobLen).setMaxVersions(1).build(); 241 TableDescriptor tDesc = 242 TableDescriptorBuilder.newBuilder(testTable).setColumnFamily(cfDesc).build(); 243 admin.createTable(tDesc); 244 assertTrue(admin.tableExists(testTable)); 245 246 // put some data 247 loadData(admin.getConnection().getTable(testTable), 0, 10); 248 249 HRegion region = HTU.getHBaseCluster().getRegions(testTable).get(0); 250 HStore store = region.getStore(fam); 251 Collection<HStoreFile> storeFiles = store.getStorefiles(); 252 assertEquals(1, store.getStorefiles().size()); 253 final HStoreFile sf = storeFiles.iterator().next(); 254 assertNotNull(sf); 255 long mobFileNum = getNumberOfMobFiles(conf, testTable, new String(fam)); 256 assertEquals(1, mobFileNum); 257 258 ServerName serverName = null; 259 for (ServerName sn : admin.getRegionServers()) { 260 boolean flag = admin.getRegions(sn).stream().anyMatch( 261 r -> r.getRegionNameAsString().equals(region.getRegionInfo().getRegionNameAsString())); 262 if (flag) { 263 serverName = sn; 264 break; 265 } 266 } 267 assertNotNull(serverName); 268 RSMobFileCleanerChore cleanerChore = 269 HTU.getHBaseCluster().getRegionServer(serverName).getRSMobFileCleanerChore(); 270 CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> { 271 boolean readerIsNotNull = false; 272 try { 273 sf.initReader(); 274 Thread.sleep(1000 * 10); 275 readerIsNotNull = sf.getReader() != null; 276 sf.closeStoreFile(true); 277 } catch (Exception e) { 278 LOG.error("We occur an exception", e); 279 } 280 return readerIsNotNull; 281 }); 282 Thread.sleep(100); 283 // The StoreFileReader object was created by another thread 284 cleanerChore.chore(); 285 Boolean readerIsNotNull = future.join(); 286 assertTrue(readerIsNotNull); 287 admin.disableTable(testTable); 288 admin.deleteTable(testTable); 289 } 290 291 private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family) 292 throws IOException { 293 FileSystem fs = FileSystem.get(conf); 294 Path dir = MobUtils.getMobFamilyPath(conf, tableName, family); 295 FileStatus[] stat = fs.listStatus(dir); 296 for (FileStatus st : stat) { 297 LOG.debug("DDDD MOB Directory content: {} size={}", st.getPath(), st.getLen()); 298 } 299 LOG.debug("MOB Directory content total files: {}", stat.length); 300 301 return stat.length; 302 } 303 304 private long scanTable() { 305 try { 306 307 Result result; 308 ResultScanner scanner = table.getScanner(fam); 309 long counter = 0; 310 while ((result = scanner.next()) != null) { 311 assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal)); 312 counter++; 313 } 314 return counter; 315 } catch (Exception e) { 316 e.printStackTrace(); 317 LOG.error("MOB file cleaner chore test FAILED"); 318 if (HTU != null) { 319 assertTrue(false); 320 } else { 321 System.exit(-1); 322 } 323 } 324 return 0; 325 } 326}