001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.NavigableSet; 023import java.util.SortedSet; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.CellComparator; 027import org.apache.hadoop.hbase.ExtendedCell; 028import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.hadoop.hbase.util.ClassSize; 031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034 035/** 036 * An abstract class, which implements the behaviour shared by all concrete memstore instances. 037 */ 038@InterfaceAudience.Private 039public abstract class AbstractMemStore implements MemStore { 040 041 private static final long NO_SNAPSHOT_ID = -1; 042 043 private final Configuration conf; 044 private final CellComparator comparator; 045 046 // active segment absorbs write operations 047 private volatile MutableSegment active; 048 // Snapshot of memstore. Made for flusher. 049 protected volatile ImmutableSegment snapshot; 050 protected volatile long snapshotId; 051 // Used to track when to flush 052 private volatile long timeOfOldestEdit; 053 054 protected RegionServicesForStores regionServices; 055 056 // @formatter:off 057 public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT 058 + (5 * ClassSize.REFERENCE) 059 + (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit 060 // @formatter:on 061 062 public final static long DEEP_OVERHEAD = FIXED_OVERHEAD; 063 064 public static void addToScanners(List<? extends Segment> segments, long readPt, 065 List<KeyValueScanner> scanners) { 066 for (Segment item : segments) { 067 addToScanners(item, readPt, scanners); 068 } 069 } 070 071 protected static void addToScanners(Segment segment, long readPt, 072 List<KeyValueScanner> scanners) { 073 if (!segment.isEmpty()) { 074 scanners.add(segment.getScanner(readPt)); 075 } 076 } 077 078 protected AbstractMemStore(final Configuration conf, final CellComparator c, 079 final RegionServicesForStores regionServices) { 080 this.conf = conf; 081 this.comparator = c; 082 this.regionServices = regionServices; 083 resetActive(); 084 resetTimeOfOldestEdit(); 085 this.snapshot = SegmentFactory.instance().createImmutableSegment(c); 086 this.snapshotId = NO_SNAPSHOT_ID; 087 } 088 089 protected void resetActive() { 090 // Record the MutableSegment' heap overhead when initialing 091 MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); 092 // Reset heap to not include any keys 093 active = SegmentFactory.instance().createMutableSegment(conf, comparator, memstoreAccounting); 094 // regionServices can be null when testing 095 if (regionServices != null) { 096 regionServices.addMemStoreSize(memstoreAccounting.getDataSize(), 097 memstoreAccounting.getHeapSize(), memstoreAccounting.getOffHeapSize(), 098 memstoreAccounting.getCellsCount()); 099 } 100 } 101 102 protected void resetTimeOfOldestEdit() { 103 this.timeOfOldestEdit = Long.MAX_VALUE; 104 } 105 106 /** 107 * Updates the wal with the lowest sequence id (oldest entry) that is still in memory 108 * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or 109 * only if it is greater than the previous sequence id 110 */ 111 public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent); 112 113 @Override 114 public void add(Iterable<Cell> cells, MemStoreSizing memstoreSizing) { 115 for (Cell cell : cells) { 116 add(cell, memstoreSizing); 117 } 118 } 119 120 @Override 121 public void add(Cell cell, MemStoreSizing memstoreSizing) { 122 doAddOrUpsert(cell, 0, memstoreSizing, true); 123 } 124 125 /* 126 * Inserts the specified Cell into MemStore and deletes any existing versions of the same 127 * row/family/qualifier as the specified Cell. <p> First, the specified Cell is inserted into the 128 * Memstore. <p> If there are any existing Cell in this MemStore with the same row, family, and 129 * qualifier, they are removed. <p> Callers must hold the read lock. 130 * @param cell the cell to be updated 131 * @param readpoint readpoint below which we can safely remove duplicate KVs 132 * @param memstoreSizing object to accumulate changed size 133 */ 134 private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) { 135 doAddOrUpsert(cell, readpoint, memstoreSizing, false); 136 } 137 138 private void doAddOrUpsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing, 139 boolean doAdd) { 140 MutableSegment currentActive; 141 boolean succ = false; 142 while (!succ) { 143 currentActive = getActive(); 144 succ = preUpdate(currentActive, cell, memstoreSizing); 145 if (succ) { 146 if (doAdd) { 147 doAdd(currentActive, cell, memstoreSizing); 148 } else { 149 doUpsert(currentActive, cell, readpoint, memstoreSizing); 150 } 151 postUpdate(currentActive); 152 } 153 } 154 } 155 156 protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) { 157 Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false); 158 boolean mslabUsed = (toAdd != cell); 159 // This cell data is backed by the same byte[] where we read request in RPC(See 160 // HBASE-15180). By default, MSLAB is ON and we might have copied cell to MSLAB area. If 161 // not we must do below deep copy. Or else we will keep referring to the bigger chunk of 162 // memory and prevent it from getting GCed. 163 // Copy to MSLAB would not have happened if 164 // 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled" 165 // 2. When the size of the cell is bigger than the max size supported by MSLAB. See 166 // "hbase.hregion.memstore.mslab.max.allocation". This defaults to 256 KB 167 // 3. When cells are from Append/Increment operation. 168 if (!mslabUsed) { 169 toAdd = deepCopyIfNeeded(toAdd); 170 } 171 internalAdd(currentActive, toAdd, mslabUsed, memstoreSizing); 172 } 173 174 private void doUpsert(MutableSegment currentActive, Cell cell, long readpoint, 175 MemStoreSizing memstoreSizing) { 176 // Add the Cell to the MemStore 177 // Use the internalAdd method here since we (a) already have a lock 178 // and (b) cannot safely use the MSLAB here without potentially 179 // hitting OOME - see TestMemStore.testUpsertMSLAB for a 180 // test that triggers the pathological case if we don't avoid MSLAB 181 // here. 182 // This cell data is backed by the same byte[] where we read request in RPC(See 183 // HBASE-15180). We must do below deep copy. Or else we will keep referring to the bigger 184 // chunk of memory and prevent it from getting GCed. 185 cell = deepCopyIfNeeded(cell); 186 boolean sizeAddedPreOperation = sizeAddedPreOperation(); 187 currentActive.upsert(cell, readpoint, memstoreSizing, sizeAddedPreOperation); 188 setOldestEditTimeToNow(); 189 } 190 191 /** 192 * Issue any synchronization and test needed before applying the update 193 * @param currentActive the segment to be updated 194 * @param cell the cell to be added 195 * @param memstoreSizing object to accumulate region size changes 196 * @return true iff can proceed with applying the update 197 */ 198 protected abstract boolean preUpdate(MutableSegment currentActive, Cell cell, 199 MemStoreSizing memstoreSizing); 200 201 /** 202 * Issue any post update synchronization and tests 203 * @param currentActive updated segment 204 */ 205 protected abstract void postUpdate(MutableSegment currentActive); 206 207 private static Cell deepCopyIfNeeded(Cell cell) { 208 if (cell instanceof ExtendedCell) { 209 return ((ExtendedCell) cell).deepClone(); 210 } 211 return cell; 212 } 213 214 @Override 215 public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) { 216 for (Cell cell : cells) { 217 upsert(cell, readpoint, memstoreSizing); 218 } 219 } 220 221 /** Returns Oldest timestamp of all the Cells in the MemStore */ 222 @Override 223 public long timeOfOldestEdit() { 224 return timeOfOldestEdit; 225 } 226 227 /** 228 * This method is protected under {@link HStore#lock} write lock,<br/> 229 * and this method is used by {@link HStore#updateStorefiles} after flushing is completed.<br/> 230 * The passed snapshot was successfully persisted; it can be let go. 231 * @param id Id of the snapshot to clean out. 232 * @see MemStore#snapshot() 233 */ 234 @Override 235 public void clearSnapshot(long id) throws UnexpectedStateException { 236 if (this.snapshotId == -1) return; // already cleared 237 if (this.snapshotId != id) { 238 throw new UnexpectedStateException( 239 "Current snapshot id is " + this.snapshotId + ",passed " + id); 240 } 241 // OK. Passed in snapshot is same as current snapshot. If not-empty, 242 // create a new snapshot and let the old one go. 243 doClearSnapShot(); 244 } 245 246 protected void doClearSnapShot() { 247 Segment oldSnapshot = this.snapshot; 248 if (!this.snapshot.isEmpty()) { 249 this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator); 250 } 251 this.snapshotId = NO_SNAPSHOT_ID; 252 oldSnapshot.close(); 253 } 254 255 @Override 256 public MemStoreSize getSnapshotSize() { 257 return this.snapshot.getMemStoreSize(); 258 } 259 260 @Override 261 public String toString() { 262 StringBuilder buf = new StringBuilder(); 263 int i = 1; 264 try { 265 for (Segment segment : getSegments()) { 266 buf.append("Segment (").append(i).append(") ").append(segment.toString()).append("; "); 267 i++; 268 } 269 } catch (IOException e) { 270 return e.toString(); 271 } 272 return buf.toString(); 273 } 274 275 protected Configuration getConfiguration() { 276 return conf; 277 } 278 279 protected void dump(Logger log) { 280 getActive().dump(log); 281 snapshot.dump(log); 282 } 283 284 /* 285 * @return Return lowest of a or b or null if both a and b are null 286 */ 287 protected Cell getLowest(final Cell a, final Cell b) { 288 if (a == null) { 289 return b; 290 } 291 if (b == null) { 292 return a; 293 } 294 return comparator.compareRows(a, b) <= 0 ? a : b; 295 } 296 297 /* 298 * @param key Find row that follows this one. If null, return first. 299 * @param set Set to look in for a row beyond <code>row</code>. 300 * @return Next row or null if none found. If one found, will be a new KeyValue -- can be 301 * destroyed by subsequent calls to this method. 302 */ 303 protected Cell getNextRow(final Cell key, final NavigableSet<Cell> set) { 304 Cell result = null; 305 SortedSet<Cell> tail = key == null ? set : set.tailSet(key); 306 // Iterate until we fall into the next row; i.e. move off current row 307 for (Cell cell : tail) { 308 if (comparator.compareRows(cell, key) <= 0) { 309 continue; 310 } 311 // Note: Not suppressing deletes or expired cells. Needs to be handled 312 // by higher up functions. 313 result = cell; 314 break; 315 } 316 return result; 317 } 318 319 /** 320 * If the segment has a memory allocator the cell is being cloned to this space, and returned; 321 * Otherwise the given cell is returned When a cell's size is too big (bigger than maxAlloc), it 322 * is not allocated on MSLAB. Since the process of flattening to CellChunkMap assumes that all 323 * cells are allocated on MSLAB, during this process, the input parameter forceCloneOfBigCell is 324 * set to 'true' and the cell is copied into MSLAB. 325 * @param cell the cell to clone 326 * @param forceCloneOfBigCell true only during the process of flattening to CellChunkMap. 327 * @return either the given cell or its clone 328 */ 329 private Cell maybeCloneWithAllocator(MutableSegment currentActive, Cell cell, 330 boolean forceCloneOfBigCell) { 331 return currentActive.maybeCloneWithAllocator(cell, forceCloneOfBigCell); 332 } 333 334 /* 335 * Internal version of add() that doesn't clone Cells with the allocator, and doesn't take the 336 * lock. Callers should ensure they already have the read lock taken 337 * @param toAdd the cell to add 338 * @param mslabUsed whether using MSLAB 339 * @param memstoreSizing object to accumulate changed size 340 */ 341 private void internalAdd(MutableSegment currentActive, final Cell toAdd, final boolean mslabUsed, 342 MemStoreSizing memstoreSizing) { 343 boolean sizeAddedPreOperation = sizeAddedPreOperation(); 344 currentActive.add(toAdd, mslabUsed, memstoreSizing, sizeAddedPreOperation); 345 setOldestEditTimeToNow(); 346 } 347 348 protected abstract boolean sizeAddedPreOperation(); 349 350 private void setOldestEditTimeToNow() { 351 if (timeOfOldestEdit == Long.MAX_VALUE) { 352 timeOfOldestEdit = EnvironmentEdgeManager.currentTime(); 353 } 354 } 355 356 /** 357 * Returns The total size of cells in this memstore. We will not consider cells in the snapshot 358 */ 359 protected abstract long keySize(); 360 361 /** 362 * @return The total heap size of cells in this memstore. We will not consider cells in the 363 * snapshot 364 */ 365 protected abstract long heapSize(); 366 367 protected CellComparator getComparator() { 368 return comparator; 369 } 370 371 MutableSegment getActive() { 372 return active; 373 } 374 375 ImmutableSegment getSnapshot() { 376 return snapshot; 377 } 378 379 @Override 380 public void close() { 381 // active should never be null 382 active.close(); 383 // for snapshot, either it is empty, where we do not reference any real segment which contains a 384 // memstore lab, or it is during snapshot, where we will clear it when calling clearSnapshot, so 385 // we do not need to close it here 386 } 387 388 /** Returns an ordered list of segments from most recent to oldest in memstore */ 389 protected abstract List<Segment> getSegments() throws IOException; 390 391}