001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.util.LinkedList;
021import java.util.concurrent.atomic.AtomicLong;
022import org.apache.hadoop.hbase.util.Bytes;
023import org.apache.hadoop.hbase.util.ClassSize;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
029import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects.ToStringHelper;
030
031/**
032 * Manages the read/write consistency. This provides an interface for readers to determine what
033 * entries to ignore, and a mechanism for writers to obtain new write numbers, then "commit" the new
034 * writes for readers to read (thus forming atomic transactions).
035 */
036@InterfaceAudience.Private
037public class MultiVersionConcurrencyControl {
038  private static final Logger LOG = LoggerFactory.getLogger(MultiVersionConcurrencyControl.class);
039  private static final long READPOINT_ADVANCE_WAIT_TIME = 10L;
040
041  final String regionName;
042  final AtomicLong readPoint = new AtomicLong(0);
043  final AtomicLong writePoint = new AtomicLong(0);
044  private final Object readWaiters = new Object();
045  /**
046   * Represents no value, or not set.
047   */
048  public static final long NONE = -1;
049
050  // This is the pending queue of writes.
051  //
052  // TODO(eclark): Should this be an array of fixed size to
053  // reduce the number of allocations on the write path?
054  // This could be equal to the number of handlers + a small number.
055  // TODO: St.Ack 20150903 Sounds good to me.
056  private final LinkedList<WriteEntry> writeQueue = new LinkedList<>();
057
058  public MultiVersionConcurrencyControl() {
059    this(null);
060  }
061
062  public MultiVersionConcurrencyControl(String regionName) {
063    this.regionName = regionName;
064  }
065
066  /**
067   * Construct and set read point. Write point is uninitialized.
068   */
069  public MultiVersionConcurrencyControl(long startPoint) {
070    this(null);
071    tryAdvanceTo(startPoint, NONE);
072  }
073
074  /**
075   * Step the MVCC forward on to a new read/write basis.
076   */
077  public void advanceTo(long newStartPoint) {
078    while (true) {
079      long seqId = this.getWritePoint();
080      if (seqId >= newStartPoint) {
081        break;
082      }
083      if (this.tryAdvanceTo(newStartPoint, seqId)) {
084        break;
085      }
086    }
087  }
088
089  /**
090   * Step the MVCC forward on to a new read/write basis.
091   * @param newStartPoint Point to move read and write points to.
092   * @param expected      If not -1 (#NONE)
093   * @return Returns false if <code>expected</code> is not equal to the current
094   *         <code>readPoint</code> or if <code>startPoint</code> is less than current
095   *         <code>readPoint</code>
096   */
097  boolean tryAdvanceTo(long newStartPoint, long expected) {
098    synchronized (writeQueue) {
099      long currentRead = this.readPoint.get();
100      long currentWrite = this.writePoint.get();
101      if (currentRead != currentWrite) {
102        throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead
103          + ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo");
104      }
105      if (expected != NONE && expected != currentRead) {
106        return false;
107      }
108
109      if (newStartPoint < currentRead) {
110        return false;
111      }
112
113      readPoint.set(newStartPoint);
114      writePoint.set(newStartPoint);
115    }
116    return true;
117  }
118
119  /**
120   * Call {@link #begin(Runnable)} with an empty {@link Runnable}.
121   */
122  public WriteEntry begin() {
123    return begin(() -> {
124    });
125  }
126
127  /**
128   * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it
129   * to our queue of ongoing writes. Return this WriteEntry instance. To complete the write
130   * transaction and wait for it to be visible, call {@link #completeAndWait(WriteEntry)}. If the
131   * write failed, call {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of
132   * the failed write transaction.
133   * <p>
134   * The {@code action} will be executed under the lock which means it can keep the same order with
135   * mvcc.
136   * @see #complete(WriteEntry)
137   * @see #completeAndWait(WriteEntry)
138   */
139  public WriteEntry begin(Runnable action) {
140    synchronized (writeQueue) {
141      long nextWriteNumber = writePoint.incrementAndGet();
142      WriteEntry e = new WriteEntry(nextWriteNumber);
143      writeQueue.add(e);
144      action.run();
145      return e;
146    }
147  }
148
149  /**
150   * Wait until the read point catches up to the write point; i.e. wait on all outstanding mvccs to
151   * complete.
152   */
153  public void await() {
154    // Add a write and then wait on reads to catch up to it.
155    completeAndWait(begin());
156  }
157
158  /**
159   * Complete a {@link WriteEntry} that was created by {@link #begin()} then wait until the read
160   * point catches up to our write. At the end of this call, the global read point is at least as
161   * large as the write point of the passed in WriteEntry. Thus, the write is visible to MVCC
162   * readers.
163   */
164  public void completeAndWait(WriteEntry e) {
165    if (!complete(e)) {
166      waitForRead(e);
167    }
168  }
169
170  /**
171   * Mark the {@link WriteEntry} as complete and advance the read point as much as possible. Call
172   * this even if the write has FAILED (AFTER backing out the write transaction changes completely)
173   * so we can clean up the outstanding transaction. How much is the read point advanced? Let S be
174   * the set of all write numbers that are completed. Set the read point to the highest numbered
175   * write of S.
176   * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
177   */
178  public boolean complete(WriteEntry writeEntry) {
179    synchronized (writeQueue) {
180      writeEntry.markCompleted();
181      long nextReadValue = NONE;
182      boolean ranOnce = false;
183      while (!writeQueue.isEmpty()) {
184        ranOnce = true;
185        WriteEntry queueFirst = writeQueue.getFirst();
186
187        if (nextReadValue > 0) {
188          if (nextReadValue + 1 != queueFirst.getWriteNumber()) {
189            throw new RuntimeException("Invariant in complete violated, nextReadValue="
190              + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber());
191          }
192        }
193
194        if (queueFirst.isCompleted()) {
195          nextReadValue = queueFirst.getWriteNumber();
196          writeQueue.removeFirst();
197        } else {
198          break;
199        }
200      }
201
202      if (!ranOnce) {
203        throw new RuntimeException("There is no first!");
204      }
205
206      if (nextReadValue > 0) {
207        synchronized (readWaiters) {
208          readPoint.set(nextReadValue);
209          readWaiters.notifyAll();
210        }
211      }
212      return readPoint.get() >= writeEntry.getWriteNumber();
213    }
214  }
215
216  /**
217   * Wait for the global readPoint to advance up to the passed in write entry number.
218   */
219  void waitForRead(WriteEntry e) {
220    boolean interrupted = false;
221    int count = 0;
222    synchronized (readWaiters) {
223      while (readPoint.get() < e.getWriteNumber()) {
224        if (count % 100 == 0 && count > 0) {
225          long totalWaitTillNow = READPOINT_ADVANCE_WAIT_TIME * count;
226          LOG.warn("STUCK for : " + totalWaitTillNow + " millis. " + this);
227        }
228        count++;
229        try {
230          readWaiters.wait(READPOINT_ADVANCE_WAIT_TIME);
231        } catch (InterruptedException ie) {
232          // We were interrupted... finish the loop -- i.e. cleanup --and then
233          // on our way out, reset the interrupt flag.
234          interrupted = true;
235        }
236      }
237    }
238    if (interrupted) {
239      Thread.currentThread().interrupt();
240    }
241  }
242
243  @Override
244  public String toString() {
245    ToStringHelper helper =
246      MoreObjects.toStringHelper(this).add("readPoint", readPoint).add("writePoint", writePoint);
247    if (this.regionName != null) {
248      helper.add("regionName", this.regionName);
249    }
250    return helper.toString();
251  }
252
253  public long getReadPoint() {
254    return readPoint.get();
255  }
256
257  public long getWritePoint() {
258    return writePoint.get();
259  }
260
261  /**
262   * Write number and whether write has completed given out at start of a write transaction. Every
263   * created WriteEntry must be completed by calling mvcc#complete or #completeAndWait.
264   */
265  @InterfaceAudience.Private
266  public static class WriteEntry {
267    private final long writeNumber;
268    private boolean completed = false;
269
270    WriteEntry(long writeNumber) {
271      this.writeNumber = writeNumber;
272    }
273
274    void markCompleted() {
275      this.completed = true;
276    }
277
278    boolean isCompleted() {
279      return this.completed;
280    }
281
282    public long getWriteNumber() {
283      return this.writeNumber;
284    }
285
286    @Override
287    public String toString() {
288      return this.writeNumber + ", " + this.completed;
289    }
290  }
291
292  public static final long FIXED_SIZE =
293    ClassSize.align(ClassSize.OBJECT + 2 * Bytes.SIZEOF_LONG + 2 * ClassSize.REFERENCE);
294}