001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.Date; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Set; 027import java.util.function.Consumer; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.ExtendedCell; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.monitoring.MonitoredTask; 035import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; 036import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 037import org.apache.hadoop.hbase.regionserver.HMobStore; 038import org.apache.hadoop.hbase.regionserver.HStore; 039import org.apache.hadoop.hbase.regionserver.InternalScanner; 040import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; 041import org.apache.hadoop.hbase.regionserver.ScannerContext; 042import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 043import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 044import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.util.StringUtils; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap; 052 053/** 054 * An implementation of the StoreFlusher. It extends the DefaultStoreFlusher. If the store is not a 055 * mob store, the flusher flushes the MemStore the same with DefaultStoreFlusher, If the store is a 056 * mob store, the flusher flushes the MemStore into two places. One is the store files of HBase, the 057 * other is the mob files. 058 * <ol> 059 * <li>Cells that are not PUT type or have the delete mark will be directly flushed to HBase.</li> 060 * <li>If the size of a cell value is larger than a threshold, it'll be flushed to a mob file, 061 * another cell with the path of this file will be flushed to HBase.</li> 062 * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to 063 * HBase directly.</li> 064 * </ol> 065 */ 066@InterfaceAudience.Private 067public class DefaultMobStoreFlusher extends DefaultStoreFlusher { 068 069 private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreFlusher.class); 070 private final Object flushLock = new Object(); 071 private long mobCellValueSizeThreshold = 0; 072 private Path targetPath; 073 private HMobStore mobStore; 074 // MOB file reference set 075 static ThreadLocal<Set<String>> mobRefSet = new ThreadLocal<Set<String>>() { 076 @Override 077 protected Set<String> initialValue() { 078 return new HashSet<String>(); 079 } 080 }; 081 082 public DefaultMobStoreFlusher(Configuration conf, HStore store) throws IOException { 083 super(conf, store); 084 if (!(store instanceof HMobStore)) { 085 throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); 086 } 087 mobCellValueSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold(); 088 this.targetPath = 089 MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); 090 if (!this.store.getFileSystem().exists(targetPath)) { 091 this.store.getFileSystem().mkdirs(targetPath); 092 } 093 this.mobStore = (HMobStore) store; 094 } 095 096 /** 097 * Flushes the snapshot of the MemStore. If this store is not a mob store, flush the cells in the 098 * snapshot to store files of HBase. If the store is a mob one, the flusher flushes the MemStore 099 * into two places. One is the store files of HBase, the other is the mob files. 100 * <ol> 101 * <li>Cells that are not PUT type or have the delete mark will be directly flushed to HBase.</li> 102 * <li>If the size of a cell value is larger than a threshold, it'll be flushed to a mob file, 103 * another cell with the path of this file will be flushed to HBase.</li> 104 * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to 105 * HBase directly.</li> 106 * </ol> 107 */ 108 @Override 109 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 110 MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, 111 Consumer<Path> writerCreationTracker) throws IOException { 112 ArrayList<Path> result = new ArrayList<>(); 113 long cellsCount = snapshot.getCellsCount(); 114 if (cellsCount == 0) return result; // don't flush if there are no entries 115 116 // Use a store scanner to find which rows to flush. 117 InternalScanner scanner = createScanner(snapshot.getScanners(), tracker); 118 StoreFileWriter writer; 119 try { 120 // TODO: We can fail in the below block before we complete adding this flush to 121 // list of store files. Add cleanup of anything put on filesystem if we fail. 122 synchronized (flushLock) { 123 status.setStatus("Flushing " + store + ": creating writer"); 124 // Write the map out to the disk 125 writer = createWriter(snapshot, true, writerCreationTracker); 126 IOException e = null; 127 try { 128 // It's a mob store, flush the cells in a mob way. This is the difference of flushing 129 // between a normal and a mob store. 130 performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController, 131 writerCreationTracker); 132 } catch (IOException ioe) { 133 e = ioe; 134 // throw the exception out 135 throw ioe; 136 } finally { 137 if (e != null) { 138 writer.close(); 139 } else { 140 finalizeWriter(writer, cacheFlushId, status); 141 } 142 } 143 } 144 } finally { 145 scanner.close(); 146 } 147 LOG.info("Mob store is flushed, sequenceid=" + cacheFlushId + ", memsize=" 148 + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getDataSize(), "", 1) 149 + ", hasBloomFilter=" + writer.hasGeneralBloom() + ", into tmp file " + writer.getPath()); 150 result.add(writer.getPath()); 151 return result; 152 } 153 154 /** 155 * Flushes the cells in the mob store. 156 * <ol> 157 * In the mob store, the cells with PUT type might have or have no mob tags. 158 * <li>If a cell does not have a mob tag, flushing the cell to different files depends on the 159 * value length. If the length is larger than a threshold, it's flushed to a mob file and the mob 160 * file is flushed to a store file in HBase. Otherwise, directly flush the cell to a store file in 161 * HBase.</li> 162 * <li>If a cell have a mob tag, its value is a mob file name, directly flush it to a store file 163 * in HBase.</li> 164 * </ol> 165 * @param snapshot Memstore snapshot. 166 * @param cacheFlushId Log cache flush sequence number. 167 * @param scanner The scanner of memstore snapshot. 168 * @param writer The store file writer. 169 * @param status Task that represents the flush operation and may be updated with 170 * status. 171 * @param throughputController A controller to avoid flush too fast. 172 */ 173 protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId, 174 InternalScanner scanner, StoreFileWriter writer, MonitoredTask status, 175 ThroughputController throughputController, Consumer<Path> writerCreationTracker) 176 throws IOException { 177 StoreFileWriter mobFileWriter = null; 178 int compactionKVMax = 179 conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 180 long mobCount = 0; 181 long mobSize = 0; 182 long time = snapshot.getTimeRangeTracker().getMax(); 183 mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst() 184 ? mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(), 185 store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(), 186 false) 187 : mobStore.createWriter(new Date(time), snapshot.getCellsCount(), 188 store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(), 189 false, writerCreationTracker); 190 // the target path is {tableName}/.mob/{cfName}/mobFiles 191 // the relative path is mobFiles 192 byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 193 ScannerContext scannerContext = 194 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 195 List<ExtendedCell> cells = new ArrayList<>(); 196 boolean hasMore; 197 String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush"); 198 boolean control = 199 throughputController != null && !store.getRegionInfo().getTable().isSystemTable(); 200 if (control) { 201 throughputController.start(flushName); 202 } 203 IOException ioe = null; 204 // Clear all past MOB references 205 mobRefSet.get().clear(); 206 try { 207 do { 208 hasMore = scanner.next((List) cells, scannerContext); 209 if (!cells.isEmpty()) { 210 for (ExtendedCell c : cells) { 211 // If we know that this KV is going to be included always, then let us 212 // set its memstoreTS to 0. This will help us save space when writing to 213 // disk. 214 if ( 215 c.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(c) 216 || c.getTypeByte() != KeyValue.Type.Put.getCode() 217 ) { 218 writer.append(c); 219 } else { 220 // append the original keyValue in the mob file. 221 mobFileWriter.append(c); 222 mobSize += c.getValueLength(); 223 mobCount++; 224 // append the tags to the KeyValue. 225 // The key is same, the value is the filename of the mob file 226 ExtendedCell reference = 227 MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()); 228 writer.append(reference); 229 } 230 if (control) { 231 throughputController.control(flushName, c.getSerializedSize()); 232 } 233 } 234 cells.clear(); 235 } 236 } while (hasMore); 237 } catch (InterruptedException e) { 238 ioe = 239 new InterruptedIOException("Interrupted while control throughput of flushing " + flushName); 240 throw ioe; 241 } catch (IOException e) { 242 ioe = e; 243 throw e; 244 } finally { 245 if (control) { 246 throughputController.finish(flushName); 247 } 248 if (ioe != null) { 249 mobFileWriter.close(); 250 } 251 } 252 253 if (mobCount > 0) { 254 // commit the mob file from temp folder to target folder. 255 // If the mob file is committed successfully but the store file is not, 256 // the committed mob file will be handled by the sweep tool as an unused 257 // file. 258 status.setStatus("Flushing mob file " + store + ": appending metadata"); 259 mobFileWriter.appendMetadata(cacheFlushId, false, mobCount); 260 status.setStatus("Flushing mob file " + store + ": closing flushed file"); 261 mobFileWriter.close(); 262 mobStore.commitFile(mobFileWriter.getPath(), targetPath); 263 LOG.debug("Flush store file: {}, store: {}", writer.getPath(), getStoreInfo()); 264 mobStore.updateMobFlushCount(); 265 mobStore.updateMobFlushedCellsCount(mobCount); 266 mobStore.updateMobFlushedCellsSize(mobSize); 267 // Add mob reference to store file metadata 268 mobRefSet.get().add(mobFileWriter.getPath().getName()); 269 } else { 270 try { 271 status.setStatus("Flushing mob file " + store + ": no mob cells, closing flushed file"); 272 mobFileWriter.close(); 273 // If the mob file is empty, delete it instead of committing. 274 store.getFileSystem().delete(mobFileWriter.getPath(), true); 275 } catch (IOException e) { 276 LOG.error("Failed to delete the temp mob file", e); 277 } 278 } 279 } 280 281 @Override 282 protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum, MonitoredTask status) 283 throws IOException { 284 // Write out the log sequence number that corresponds to this output 285 // hfile. Also write current time in metadata as minFlushTime. 286 // The hfile is current up to and including cacheFlushSeqNum. 287 status.setStatus("Flushing " + store + ": appending metadata"); 288 writer.appendMetadata(cacheFlushSeqNum, false); 289 writer.appendMobMetadata(ImmutableSetMultimap.<TableName, String> builder() 290 .putAll(store.getTableName(), mobRefSet.get()).build()); 291 status.setStatus("Flushing " + store + ": closing flushed file"); 292 writer.close(); 293 } 294 295 private String getStoreInfo() { 296 return String.format("[table=%s family=%s region=%s]", store.getTableName().getNameAsString(), 297 store.getColumnFamilyName(), store.getRegionInfo().getEncodedName()); 298 } 299}