001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.lang.management.ManagementFactory; 022import java.lang.management.RuntimeMXBean; 023import java.util.ArrayList; 024import java.util.List; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellComparator; 028import org.apache.hadoop.hbase.ExtendedCell; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.InnerStoreCellComparator; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.ClassSize; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * The MemStore holds in-memory modifications to the Store. Modifications are {@link Cell}s. When 042 * asked to flush, current memstore is moved to snapshot and is cleared. We continue to serve edits 043 * out of new memstore and backing snapshot until flusher reports in that the flush succeeded. At 044 * this point we let the snapshot go. 045 * <p> 046 * The MemStore functions should not be called in parallel. Callers should hold write and read 047 * locks. This is done in {@link HStore}. 048 * </p> 049 * TODO: Adjust size of the memstore when we remove items because they have been deleted. TODO: With 050 * new KVSLS, need to make sure we update HeapSize with difference in KV size. 051 */ 052@InterfaceAudience.Private 053public class DefaultMemStore extends AbstractMemStore { 054 private static final Logger LOG = LoggerFactory.getLogger(DefaultMemStore.class); 055 056 public final static long DEEP_OVERHEAD = ClassSize.align(AbstractMemStore.DEEP_OVERHEAD); 057 public final static long FIXED_OVERHEAD = ClassSize.align(AbstractMemStore.FIXED_OVERHEAD); 058 059 /** 060 * Default constructor. Used for tests. 061 */ 062 public DefaultMemStore() { 063 this(HBaseConfiguration.create(), InnerStoreCellComparator.INNER_STORE_COMPARATOR, null); 064 } 065 066 /** 067 * Constructor. 068 * @param c Comparator 069 */ 070 public DefaultMemStore(final Configuration conf, final CellComparator c) { 071 super(conf, c, null); 072 } 073 074 /** 075 * Constructor. 076 * @param c Comparator 077 */ 078 public DefaultMemStore(final Configuration conf, final CellComparator c, 079 final RegionServicesForStores regionServices) { 080 super(conf, c, regionServices); 081 } 082 083 /** 084 * Creates a snapshot of the current memstore. Snapshot must be cleared by call to 085 * {@link #clearSnapshot(long)} 086 */ 087 @Override 088 public MemStoreSnapshot snapshot() { 089 // If snapshot currently has entries, then flusher failed or didn't call 090 // cleanup. Log a warning. 091 if (!this.snapshot.isEmpty()) { 092 LOG.warn("Snapshot called again without clearing previous. " 093 + "Doing nothing. Another ongoing flush or did we fail last attempt?"); 094 } else { 095 this.snapshotId = EnvironmentEdgeManager.currentTime(); 096 if (!getActive().isEmpty()) { 097 // Record the ImmutableSegment' heap overhead when initialing 098 MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); 099 ImmutableSegment immutableSegment = 100 SegmentFactory.instance().createImmutableSegment(getActive(), memstoreAccounting); 101 // regionServices can be null when testing 102 if (regionServices != null) { 103 regionServices.addMemStoreSize(memstoreAccounting.getDataSize(), 104 memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(), 105 memstoreAccounting.getCellsCount()); 106 } 107 this.snapshot = immutableSegment; 108 resetActive(); 109 resetTimeOfOldestEdit(); 110 } 111 } 112 return new MemStoreSnapshot(this.snapshotId, this.snapshot); 113 } 114 115 @Override 116 public MemStoreSize getFlushableSize() { 117 MemStoreSize mss = getSnapshotSize(); 118 return mss.getDataSize() > 0 ? mss : getActive().getMemStoreSize(); 119 } 120 121 @Override 122 protected long keySize() { 123 return getActive().getDataSize(); 124 } 125 126 @Override 127 protected long heapSize() { 128 return getActive().getHeapSize(); 129 } 130 131 @Override 132 /** 133 * This method is protected under {@link HStore#lock} read lock. <br/> 134 * Scanners are ordered from 0 (oldest) to newest in increasing order. 135 */ 136 public List<KeyValueScanner> getScanners(long readPt) throws IOException { 137 List<KeyValueScanner> list = new ArrayList<>(); 138 addToScanners(getActive(), readPt, list); 139 addToScanners(getSnapshotSegments(), readPt, list); 140 return list; 141 } 142 143 protected List<Segment> getSnapshotSegments() { 144 return snapshot.getAllSegments(); 145 } 146 147 @Override 148 protected List<Segment> getSegments() throws IOException { 149 List<Segment> list = new ArrayList<>(2); 150 list.add(getActive()); 151 list.add(snapshot); 152 return list; 153 } 154 155 /** 156 * @param cell Find the row that comes after this one. If null, we return the first. 157 * @return Next row or null if none found. 158 */ 159 ExtendedCell getNextRow(final ExtendedCell cell) { 160 return getLowest(getNextRow(cell, this.getActive().getCellSet()), 161 getNextRow(cell, this.snapshot.getCellSet())); 162 } 163 164 @Override 165 public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) { 166 } 167 168 @Override 169 protected boolean preUpdate(MutableSegment currentActive, ExtendedCell cell, 170 MemStoreSizing memstoreSizing) { 171 return true; 172 } 173 174 @Override 175 protected void postUpdate(MutableSegment currentActive) { 176 return; 177 } 178 179 @Override 180 protected boolean sizeAddedPreOperation() { 181 return false; 182 } 183 184 @Override 185 public MemStoreSize size() { 186 return getActive().getMemStoreSize(); 187 } 188 189 @Override 190 public long preFlushSeqIDEstimation() { 191 return HConstants.NO_SEQNUM; 192 } 193 194 @Override 195 public boolean isSloppy() { 196 return false; 197 } 198 199 /** 200 * Code to help figure if our approximation of object heap sizes is close enough. See hbase-900. 201 * Fills memstores then waits so user can heap dump and bring up resultant hprof in something like 202 * jprofiler which allows you get 'deep size' on objects. 203 * @param args main args 204 */ 205 public static void main(String[] args) { 206 RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); 207 LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" + runtime.getVmVendor() 208 + ", vmVersion=" + runtime.getVmVersion()); 209 LOG.info("vmInputArguments=" + runtime.getInputArguments()); 210 DefaultMemStore memstore1 = new DefaultMemStore(); 211 // TODO: x32 vs x64 212 final int count = 10000; 213 byte[] fam = Bytes.toBytes("col"); 214 byte[] qf = Bytes.toBytes("umn"); 215 byte[] empty = new byte[0]; 216 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 217 for (int i = 0; i < count; i++) { 218 // Give each its own ts 219 memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing); 220 } 221 LOG.info("memstore1 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() 222 + memStoreSizing.getMemStoreSize().getHeapSize()); 223 for (int i = 0; i < count; i++) { 224 memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memStoreSizing); 225 } 226 LOG.info("memstore1 estimated size (2nd loading of same data)={}", 227 memStoreSizing.getMemStoreSize().getDataSize() 228 + memStoreSizing.getMemStoreSize().getHeapSize()); 229 // Make a variably sized memstore. 230 DefaultMemStore memstore2 = new DefaultMemStore(); 231 memStoreSizing = new NonThreadSafeMemStoreSizing(); 232 for (int i = 0; i < count; i++) { 233 memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memStoreSizing); 234 } 235 LOG.info("memstore2 estimated size={}", memStoreSizing.getMemStoreSize().getDataSize() 236 + memStoreSizing.getMemStoreSize().getHeapSize()); 237 final int seconds = 30; 238 LOG.info("Waiting " + seconds + " seconds while heap dump is taken"); 239 LOG.info("Exiting."); 240 } 241}