001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.util.LinkedList; 021import java.util.concurrent.atomic.AtomicLong; 022import org.apache.hadoop.hbase.util.Bytes; 023import org.apache.hadoop.hbase.util.ClassSize; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 029import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects.ToStringHelper; 030 031/** 032 * Manages the read/write consistency. This provides an interface for readers to determine what 033 * entries to ignore, and a mechanism for writers to obtain new write numbers, then "commit" the new 034 * writes for readers to read (thus forming atomic transactions). 035 */ 036@InterfaceAudience.Private 037public class MultiVersionConcurrencyControl { 038 private static final Logger LOG = LoggerFactory.getLogger(MultiVersionConcurrencyControl.class); 039 private static final long READPOINT_ADVANCE_WAIT_TIME = 10L; 040 041 final String regionName; 042 final AtomicLong readPoint = new AtomicLong(0); 043 final AtomicLong writePoint = new AtomicLong(0); 044 private final Object readWaiters = new Object(); 045 /** 046 * Represents no value, or not set. 047 */ 048 public static final long NONE = -1; 049 050 // This is the pending queue of writes. 051 // 052 // TODO(eclark): Should this be an array of fixed size to 053 // reduce the number of allocations on the write path? 054 // This could be equal to the number of handlers + a small number. 055 // TODO: St.Ack 20150903 Sounds good to me. 056 private final LinkedList<WriteEntry> writeQueue = new LinkedList<>(); 057 058 public MultiVersionConcurrencyControl() { 059 this(null); 060 } 061 062 public MultiVersionConcurrencyControl(String regionName) { 063 this.regionName = regionName; 064 } 065 066 /** 067 * Construct and set read point. Write point is uninitialized. 068 */ 069 public MultiVersionConcurrencyControl(long startPoint) { 070 this(null); 071 tryAdvanceTo(startPoint, NONE); 072 } 073 074 /** 075 * Step the MVCC forward on to a new read/write basis. 076 */ 077 public void advanceTo(long newStartPoint) { 078 while (true) { 079 long seqId = this.getWritePoint(); 080 if (seqId >= newStartPoint) { 081 break; 082 } 083 if (this.tryAdvanceTo(newStartPoint, seqId)) { 084 break; 085 } 086 } 087 } 088 089 /** 090 * Step the MVCC forward on to a new read/write basis. 091 * @param newStartPoint Point to move read and write points to. 092 * @param expected If not -1 (#NONE) 093 * @return Returns false if <code>expected</code> is not equal to the current 094 * <code>readPoint</code> or if <code>startPoint</code> is less than current 095 * <code>readPoint</code> 096 */ 097 boolean tryAdvanceTo(long newStartPoint, long expected) { 098 synchronized (writeQueue) { 099 long currentRead = this.readPoint.get(); 100 long currentWrite = this.writePoint.get(); 101 if (currentRead != currentWrite) { 102 throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead 103 + ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo"); 104 } 105 if (expected != NONE && expected != currentRead) { 106 return false; 107 } 108 109 if (newStartPoint < currentRead) { 110 return false; 111 } 112 113 readPoint.set(newStartPoint); 114 writePoint.set(newStartPoint); 115 } 116 return true; 117 } 118 119 /** 120 * Call {@link #begin(Runnable)} with an empty {@link Runnable}. 121 */ 122 public WriteEntry begin() { 123 return begin(() -> { 124 }); 125 } 126 127 /** 128 * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it 129 * to our queue of ongoing writes. Return this WriteEntry instance. To complete the write 130 * transaction and wait for it to be visible, call {@link #completeAndWait(WriteEntry)}. If the 131 * write failed, call {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of 132 * the failed write transaction. 133 * <p> 134 * The {@code action} will be executed under the lock which means it can keep the same order with 135 * mvcc. 136 * @see #complete(WriteEntry) 137 * @see #completeAndWait(WriteEntry) 138 */ 139 public WriteEntry begin(Runnable action) { 140 synchronized (writeQueue) { 141 long nextWriteNumber = writePoint.incrementAndGet(); 142 WriteEntry e = new WriteEntry(nextWriteNumber); 143 writeQueue.add(e); 144 action.run(); 145 return e; 146 } 147 } 148 149 /** 150 * Wait until the read point catches up to the write point; i.e. wait on all outstanding mvccs to 151 * complete. 152 */ 153 public void await() { 154 // Add a write and then wait on reads to catch up to it. 155 completeAndWait(begin()); 156 } 157 158 /** 159 * Complete a {@link WriteEntry} that was created by {@link #begin()} then wait until the read 160 * point catches up to our write. At the end of this call, the global read point is at least as 161 * large as the write point of the passed in WriteEntry. Thus, the write is visible to MVCC 162 * readers. 163 */ 164 public void completeAndWait(WriteEntry e) { 165 if (!complete(e)) { 166 waitForRead(e); 167 } 168 } 169 170 /** 171 * Mark the {@link WriteEntry} as complete and advance the read point as much as possible. Call 172 * this even if the write has FAILED (AFTER backing out the write transaction changes completely) 173 * so we can clean up the outstanding transaction. How much is the read point advanced? Let S be 174 * the set of all write numbers that are completed. Set the read point to the highest numbered 175 * write of S. 176 * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber) 177 */ 178 public boolean complete(WriteEntry writeEntry) { 179 synchronized (writeQueue) { 180 writeEntry.markCompleted(); 181 long nextReadValue = NONE; 182 boolean ranOnce = false; 183 while (!writeQueue.isEmpty()) { 184 ranOnce = true; 185 WriteEntry queueFirst = writeQueue.getFirst(); 186 187 if (nextReadValue > 0) { 188 if (nextReadValue + 1 != queueFirst.getWriteNumber()) { 189 throw new RuntimeException("Invariant in complete violated, nextReadValue=" 190 + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber()); 191 } 192 } 193 194 if (queueFirst.isCompleted()) { 195 nextReadValue = queueFirst.getWriteNumber(); 196 writeQueue.removeFirst(); 197 } else { 198 break; 199 } 200 } 201 202 if (!ranOnce) { 203 throw new RuntimeException("There is no first!"); 204 } 205 206 if (nextReadValue > 0) { 207 synchronized (readWaiters) { 208 readPoint.set(nextReadValue); 209 readWaiters.notifyAll(); 210 } 211 } 212 return readPoint.get() >= writeEntry.getWriteNumber(); 213 } 214 } 215 216 /** 217 * Wait for the global readPoint to advance up to the passed in write entry number. 218 */ 219 void waitForRead(WriteEntry e) { 220 boolean interrupted = false; 221 int count = 0; 222 synchronized (readWaiters) { 223 while (readPoint.get() < e.getWriteNumber()) { 224 if (count % 100 == 0 && count > 0) { 225 long totalWaitTillNow = READPOINT_ADVANCE_WAIT_TIME * count; 226 LOG.warn("STUCK for : " + totalWaitTillNow + " millis. " + this); 227 } 228 count++; 229 try { 230 readWaiters.wait(READPOINT_ADVANCE_WAIT_TIME); 231 } catch (InterruptedException ie) { 232 // We were interrupted... finish the loop -- i.e. cleanup --and then 233 // on our way out, reset the interrupt flag. 234 interrupted = true; 235 } 236 } 237 } 238 if (interrupted) { 239 Thread.currentThread().interrupt(); 240 } 241 } 242 243 @Override 244 public String toString() { 245 ToStringHelper helper = 246 MoreObjects.toStringHelper(this).add("readPoint", readPoint).add("writePoint", writePoint); 247 if (this.regionName != null) { 248 helper.add("regionName", this.regionName); 249 } 250 return helper.toString(); 251 } 252 253 public long getReadPoint() { 254 return readPoint.get(); 255 } 256 257 public long getWritePoint() { 258 return writePoint.get(); 259 } 260 261 /** 262 * Write number and whether write has completed given out at start of a write transaction. Every 263 * created WriteEntry must be completed by calling mvcc#complete or #completeAndWait. 264 */ 265 @InterfaceAudience.Private 266 public static class WriteEntry { 267 private final long writeNumber; 268 private boolean completed = false; 269 270 WriteEntry(long writeNumber) { 271 this.writeNumber = writeNumber; 272 } 273 274 void markCompleted() { 275 this.completed = true; 276 } 277 278 boolean isCompleted() { 279 return this.completed; 280 } 281 282 public long getWriteNumber() { 283 return this.writeNumber; 284 } 285 286 @Override 287 public String toString() { 288 return this.writeNumber + ", " + this.completed; 289 } 290 } 291 292 public static final long FIXED_SIZE = 293 ClassSize.align(ClassSize.OBJECT + 2 * Bytes.SIZEOF_LONG + 2 * ClassSize.REFERENCE); 294}