001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.lang.management.ManagementFactory; 022import java.lang.management.RuntimeMXBean; 023import java.util.ArrayList; 024import java.util.List; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellComparator; 028import org.apache.hadoop.hbase.HBaseConfiguration; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.InnerStoreCellComparator; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.hbase.util.ClassSize; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * The MemStore holds in-memory modifications to the Store. Modifications are {@link Cell}s. When 041 * asked to flush, current memstore is moved to snapshot and is cleared. We continue to serve edits 042 * out of new memstore and backing snapshot until flusher reports in that the flush succeeded. At 043 * this point we let the snapshot go. 044 * <p> 045 * The MemStore functions should not be called in parallel. Callers should hold write and read 046 * locks. This is done in {@link HStore}. 047 * </p> 048 * TODO: Adjust size of the memstore when we remove items because they have been deleted. TODO: With 049 * new KVSLS, need to make sure we update HeapSize with difference in KV size. 050 */ 051@InterfaceAudience.Private 052public class DefaultMemStore extends AbstractMemStore { 053 private static final Logger LOG = LoggerFactory.getLogger(DefaultMemStore.class); 054 055 public final static long DEEP_OVERHEAD = ClassSize.align(AbstractMemStore.DEEP_OVERHEAD); 056 public final static long FIXED_OVERHEAD = ClassSize.align(AbstractMemStore.FIXED_OVERHEAD); 057 058 /** 059 * Default constructor. Used for tests. 060 */ 061 public DefaultMemStore() { 062 this(HBaseConfiguration.create(), InnerStoreCellComparator.INNER_STORE_COMPARATOR, null); 063 } 064 065 /** 066 * Constructor. 067 * @param c Comparator 068 */ 069 public DefaultMemStore(final Configuration conf, final CellComparator c) { 070 super(conf, c, null); 071 } 072 073 /** 074 * Constructor. 075 * @param c Comparator 076 */ 077 public DefaultMemStore(final Configuration conf, final CellComparator c, 078 final RegionServicesForStores regionServices) { 079 super(conf, c, regionServices); 080 } 081 082 /** 083 * Creates a snapshot of the current memstore. Snapshot must be cleared by call to 084 * {@link #clearSnapshot(long)} 085 */ 086 @Override 087 public MemStoreSnapshot snapshot() { 088 // If snapshot currently has entries, then flusher failed or didn't call 089 // cleanup. Log a warning. 090 if (!this.snapshot.isEmpty()) { 091 LOG.warn("Snapshot called again without clearing previous. " 092 + "Doing nothing. Another ongoing flush or did we fail last attempt?"); 093 } else { 094 this.snapshotId = EnvironmentEdgeManager.currentTime(); 095 if (!getActive().isEmpty()) { 096 // Record the ImmutableSegment' heap overhead when initialing 097 MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); 098 ImmutableSegment immutableSegment = 099 SegmentFactory.instance().createImmutableSegment(getActive(), memstoreAccounting); 100 // regionServices can be null when testing 101 if (regionServices != null) { 102 regionServices.addMemStoreSize(memstoreAccounting.getDataSize(), 103 memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(), 104 memstoreAccounting.getCellsCount()); 105 } 106 this.snapshot = immutableSegment; 107 resetActive(); 108 resetTimeOfOldestEdit(); 109 } 110 } 111 return new MemStoreSnapshot(this.snapshotId, this.snapshot); 112 } 113 114 @Override 115 public MemStoreSize getFlushableSize() { 116 MemStoreSize mss = getSnapshotSize(); 117 return mss.getDataSize() > 0 ? mss : getActive().getMemStoreSize(); 118 } 119 120 @Override 121 protected long keySize() { 122 return getActive().getDataSize(); 123 } 124 125 @Override 126 protected long heapSize() { 127 return getActive().getHeapSize(); 128 } 129 130 @Override 131 /** 132 * This method is protected under {@link HStore#lock} read lock. <br/> 133 * Scanners are ordered from 0 (oldest) to newest in increasing order. 134 */ 135 public List<KeyValueScanner> getScanners(long readPt) throws IOException { 136 List<KeyValueScanner> list = new ArrayList<>(); 137 addToScanners(getActive(), readPt, list); 138 addToScanners(getSnapshotSegments(), readPt, list); 139 return list; 140 } 141 142 protected List<Segment> getSnapshotSegments() { 143 return snapshot.getAllSegments(); 144 } 145 146 @Override 147 protected List<Segment> getSegments() throws IOException { 148 List<Segment> list = new ArrayList<>(2); 149 list.add(getActive()); 150 list.add(snapshot); 151 return list; 152 } 153 154 /** 155 * @param cell Find the row that comes after this one. If null, we return the first. 156 * @return Next row or null if none found. 157 */ 158 Cell getNextRow(final Cell cell) { 159 return getLowest(getNextRow(cell, this.getActive().getCellSet()), 160 getNextRow(cell, this.snapshot.getCellSet())); 161 } 162 163 @Override 164 public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) { 165 } 166 167 @Override 168 protected boolean preUpdate(MutableSegment currentActive, Cell cell, 169 MemStoreSizing memstoreSizing) { 170 return true; 171 } 172 173 @Override 174 protected void postUpdate(MutableSegment currentActive) { 175 return; 176 } 177 178 @Override 179 protected boolean sizeAddedPreOperation() { 180 return false; 181 } 182 183 @Override 184 public MemStoreSize size() { 185 return getActive().getMemStoreSize(); 186 } 187 188 @Override 189 public long preFlushSeqIDEstimation() { 190 return HConstants.NO_SEQNUM; 191 } 192 193 @Override 194 public boolean isSloppy() { 195 return false; 196 } 197 198 /** 199 * Code to help figure if our approximation of object heap sizes is close enough. See hbase-900. 200 * Fills memstores then waits so user can heap dump and bring up resultant hprof in something like 201 * jprofiler which allows you get 'deep size' on objects. 202 * @param args main args 203 */ 204 public static void main(String[] args) { 205 RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); 206 LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" + runtime.getVmVendor() 207 + ", vmVersion=" + runtime.getVmVersion()); 208 LOG.info("vmInputArguments=" + runtime.getInputArguments()); 209 DefaultMemStore memstore1 = new DefaultMemStore(); 210 // TODO: x32 vs x64 211 final int count = 10000; 212 byte[] fam = Bytes.toBytes("col"); 213 byte[] qf = Bytes.toBytes("umn"); 214 byte[] empty = new byte[0]; 215 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 216 for (int i = 0; i < count; i++) { 217 // Give each its own ts 218 memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing); 219 } 220 LOG.info("memstore1 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() 221 + memStoreSizing.getMemStoreSize().getHeapSize()); 222 for (int i = 0; i < count; i++) { 223 memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing); 224 } 225 LOG.info("memstore1 estimated size (2nd loading of same data)={}", 226 memStoreSizing.getMemStoreSize().getDataSize() 227 + memStoreSizing.getMemStoreSize().getHeapSize()); 228 // Make a variably sized memstore. 229 DefaultMemStore memstore2 = new DefaultMemStore(); 230 memStoreSizing = new NonThreadSafeMemStoreSizing(); 231 for (int i = 0; i < count; i++) { 232 memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memStoreSizing); 233 } 234 LOG.info("memstore2 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() 235 + memStoreSizing.getMemStoreSize().getHeapSize()); 236 final int seconds = 30; 237 LOG.info("Waiting " + seconds + " seconds while heap dump is taken"); 238 LOG.info("Exiting."); 239 } 240}