001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.TimeUnit; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.ScheduledChore; 029import org.apache.hadoop.hbase.TableDescriptors; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 033import org.apache.hadoop.hbase.client.CompactionState; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.client.TableDescriptor; 036import org.apache.hadoop.hbase.client.TableState; 037import org.apache.hadoop.hbase.master.HMaster; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * Periodic MOB compaction chore. It runs MOB compaction on region servers in parallel, thus 046 * utilizing distributed cluster resources. To avoid possible major compaction storms, one can 047 * specify maximum number regions to be compacted in parallel by setting configuration parameter: 048 * <br> 049 * 'hbase.mob.major.compaction.region.batch.size', which by default is 0 (unlimited). 050 */ 051@InterfaceAudience.Private 052public class MobFileCompactionChore extends ScheduledChore { 053 054 private static final Logger LOG = LoggerFactory.getLogger(MobFileCompactionChore.class); 055 private HMaster master; 056 private int regionBatchSize = 0;// not set - compact all 057 058 public MobFileCompactionChore(HMaster master) { 059 super(master.getServerName() + "-MobFileCompactionChore", master, 060 master.getConfiguration().getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 061 MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD), 062 master.getConfiguration().getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 063 MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD), 064 TimeUnit.SECONDS); 065 this.master = master; 066 this.regionBatchSize = 067 master.getConfiguration().getInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, 068 MobConstants.DEFAULT_MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE); 069 070 } 071 072 @RestrictedApi(explanation = "Should only be called in tests", link = "", 073 allowedOnPath = ".*/src/test/.*") 074 public MobFileCompactionChore(Configuration conf, int batchSize) { 075 this.regionBatchSize = batchSize; 076 } 077 078 @Override 079 protected void chore() { 080 081 boolean reported = false; 082 083 try (Admin admin = master.getConnection().getAdmin()) { 084 TableDescriptors htds = master.getTableDescriptors(); 085 Map<String, TableDescriptor> map = htds.getAll(); 086 for (TableDescriptor htd : map.values()) { 087 if ( 088 !master.getTableStateManager().isTableState(htd.getTableName(), TableState.State.ENABLED) 089 ) { 090 LOG.info("Skipping MOB compaction on table {} because it is not ENABLED", 091 htd.getTableName()); 092 continue; 093 } else { 094 LOG.info("Starting MOB compaction on table {}, checking {} column families", 095 htd.getTableName(), htd.getColumnFamilyCount()); 096 } 097 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 098 try { 099 if (hcd.isMobEnabled()) { 100 if (!reported) { 101 master.reportMobCompactionStart(htd.getTableName()); 102 reported = true; 103 } 104 LOG.info("Major MOB compacting table={} cf={}", htd.getTableName(), 105 hcd.getNameAsString()); 106 if (regionBatchSize == MobConstants.DEFAULT_MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE) { 107 LOG.debug( 108 "Table={} cf ={}: batch MOB compaction is disabled, {}=0 -" 109 + " all regions will be compacted in parallel", 110 htd.getTableName(), hcd.getNameAsString(), "hbase.mob.compaction.batch.size"); 111 admin.majorCompact(htd.getTableName(), hcd.getName()); 112 } else { 113 LOG.info( 114 "Table={} cf={}: performing MOB major compaction in batches " 115 + "'hbase.mob.compaction.batch.size'={}", 116 htd.getTableName(), hcd.getNameAsString(), regionBatchSize); 117 performMajorCompactionInBatches(admin, htd, hcd); 118 } 119 } else { 120 LOG.debug("Skipping table={} column family={} because it is not MOB-enabled", 121 htd.getTableName(), hcd.getNameAsString()); 122 } 123 } catch (IOException e) { 124 LOG.error("Failed to compact table={} cf={}", htd.getTableName(), hcd.getNameAsString(), 125 e); 126 } catch (InterruptedException ee) { 127 Thread.currentThread().interrupt(); 128 master.reportMobCompactionEnd(htd.getTableName()); 129 LOG.warn("Failed to compact table={} cf={}", htd.getTableName(), hcd.getNameAsString(), 130 ee); 131 // Quit the chore 132 return; 133 } 134 } 135 if (reported) { 136 master.reportMobCompactionEnd(htd.getTableName()); 137 reported = false; 138 } 139 } 140 } catch (IOException e) { 141 LOG.error("Failed to compact", e); 142 } 143 } 144 145 @RestrictedApi(explanation = "Should only be called in tests", link = "", 146 allowedOnPath = ".*(/src/test/.*|MobFileCompactionChore).java") 147 public void performMajorCompactionInBatches(Admin admin, TableDescriptor htd, 148 ColumnFamilyDescriptor hcd) throws IOException, InterruptedException { 149 150 List<RegionInfo> regions = admin.getRegions(htd.getTableName()); 151 if (regions.size() <= this.regionBatchSize) { 152 LOG.debug( 153 "Table={} cf={} - performing major MOB compaction in non-batched mode," 154 + "regions={}, batch size={}", 155 htd.getTableName(), hcd.getNameAsString(), regions.size(), regionBatchSize); 156 admin.majorCompact(htd.getTableName(), hcd.getName()); 157 return; 158 } 159 // Shuffle list of regions in case if they come ordered by region server 160 Collections.shuffle(regions); 161 // Create first batch 162 List<RegionInfo> toCompact = new ArrayList<RegionInfo>(this.regionBatchSize); 163 for (int i = 0; i < this.regionBatchSize; i++) { 164 toCompact.add(regions.remove(0)); 165 } 166 167 // Start compaction now 168 for (RegionInfo ri : toCompact) { 169 startCompaction(admin, htd.getTableName(), ri, hcd.getName()); 170 } 171 172 List<RegionInfo> compacted = new ArrayList<RegionInfo>(toCompact.size()); 173 List<RegionInfo> failed = new ArrayList<RegionInfo>(); 174 int totalCompacted = 0; 175 while (!toCompact.isEmpty()) { 176 // Check status of active compactions 177 for (RegionInfo ri : toCompact) { 178 try { 179 if (admin.getCompactionStateForRegion(ri.getRegionName()) == CompactionState.NONE) { 180 totalCompacted++; 181 LOG.info("Finished major MOB compaction: table={} cf={} region={} compacted regions={}", 182 htd.getTableName(), hcd.getNameAsString(), ri.getRegionNameAsString(), 183 totalCompacted); 184 compacted.add(ri); 185 } 186 } catch (IOException e) { 187 LOG.error( 188 "Could not get compaction state for table={} cf={} region={}, compaction will" 189 + " aborted for the region.", 190 htd.getTableName(), hcd.getNameAsString(), ri.getEncodedName()); 191 LOG.error("Because of:", e); 192 failed.add(ri); 193 } 194 } 195 // Remove failed regions to avoid 196 // endless compaction loop 197 toCompact.removeAll(failed); 198 failed.clear(); 199 // Update batch: remove compacted regions and add new ones 200 for (RegionInfo ri : compacted) { 201 toCompact.remove(ri); 202 if (regions.size() > 0) { 203 RegionInfo region = regions.remove(0); 204 toCompact.add(region); 205 startCompaction(admin, htd.getTableName(), region, hcd.getName()); 206 } 207 } 208 compacted.clear(); 209 210 LOG.debug( 211 "Table={} cf={}. Wait for 10 sec, toCompact size={} regions left={}" 212 + " compacted so far={}", 213 htd.getTableName(), hcd.getNameAsString(), toCompact.size(), regions.size(), 214 totalCompacted); 215 Thread.sleep(10000); 216 } 217 LOG.info("Finished major MOB compacting table={}. cf={}", htd.getTableName(), 218 hcd.getNameAsString()); 219 220 } 221 222 private void startCompaction(Admin admin, TableName table, RegionInfo region, byte[] cf) 223 throws IOException, InterruptedException { 224 225 LOG.info("Started major compaction: table={} cf={} region={}", table, Bytes.toString(cf), 226 region.getRegionNameAsString()); 227 admin.majorCompactRegion(region.getRegionName(), cf); 228 // Wait until it really starts 229 // but with finite timeout 230 long waitTime = 300000; // 5 min 231 long startTime = EnvironmentEdgeManager.currentTime(); 232 while (admin.getCompactionStateForRegion(region.getRegionName()) == CompactionState.NONE) { 233 // Is 1 second too aggressive? 234 Thread.sleep(1000); 235 if (EnvironmentEdgeManager.currentTime() - startTime > waitTime) { 236 LOG.warn("Waited for {} ms to start major MOB compaction on table={} cf={} region={}." 237 + " Stopped waiting for request confirmation. This is not an ERROR, continue next region.", 238 waitTime, table.getNameAsString(), Bytes.toString(cf), region.getRegionNameAsString()); 239 break; 240 } 241 } 242 } 243}