001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.util.concurrent.ThreadLocalRandom; 021import java.util.concurrent.atomic.AtomicBoolean; 022import java.util.concurrent.atomic.AtomicLong; 023import org.apache.hadoop.hbase.HBaseClassTestRule; 024import org.apache.hadoop.hbase.testclassification.MediumTests; 025import org.apache.hadoop.hbase.testclassification.RegionServerTests; 026import org.junit.Assert; 027import org.junit.ClassRule; 028import org.junit.Test; 029import org.junit.experimental.categories.Category; 030 031/** 032 * This is a hammer test that verifies MultiVersionConcurrencyControl in a multiple writer single 033 * reader scenario. 034 */ 035@Category({ RegionServerTests.class, MediumTests.class }) 036public class TestMultiVersionConcurrencyControl { 037 038 @ClassRule 039 public static final HBaseClassTestRule CLASS_RULE = 040 HBaseClassTestRule.forClass(TestMultiVersionConcurrencyControl.class); 041 042 static class Writer implements Runnable { 043 final AtomicBoolean finished; 044 final MultiVersionConcurrencyControl mvcc; 045 final AtomicBoolean status; 046 047 Writer(AtomicBoolean finished, MultiVersionConcurrencyControl mvcc, AtomicBoolean status) { 048 this.finished = finished; 049 this.mvcc = mvcc; 050 this.status = status; 051 } 052 053 public boolean failed = false; 054 055 @Override 056 public void run() { 057 while (!finished.get()) { 058 MultiVersionConcurrencyControl.WriteEntry e = mvcc.begin(); 059 // System.out.println("Begin write: " + e.getWriteNumber()); 060 // 10 usec - 500usec (including 0) 061 int sleepTime = ThreadLocalRandom.current().nextInt(500); 062 // 500 * 1000 = 500,000ns = 500 usec 063 // 1 * 100 = 100ns = 1usec 064 try { 065 if (sleepTime > 0) Thread.sleep(0, sleepTime * 1000); 066 } catch (InterruptedException e1) { 067 } 068 try { 069 mvcc.completeAndWait(e); 070 } catch (RuntimeException ex) { 071 // got failure 072 System.out.println(ex.toString()); 073 ex.printStackTrace(); 074 status.set(false); 075 return; 076 // Report failure if possible. 077 } 078 } 079 } 080 } 081 082 @Test 083 public void testParallelism() throws Exception { 084 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 085 086 final AtomicBoolean finished = new AtomicBoolean(false); 087 088 // fail flag for the reader thread 089 final AtomicBoolean readerFailed = new AtomicBoolean(false); 090 final AtomicLong failedAt = new AtomicLong(); 091 Runnable reader = new Runnable() { 092 @Override 093 public void run() { 094 long prev = mvcc.getReadPoint(); 095 while (!finished.get()) { 096 long newPrev = mvcc.getReadPoint(); 097 if (newPrev < prev) { 098 // serious problem. 099 System.out.println("Reader got out of order, prev: " + prev + " next was: " + newPrev); 100 readerFailed.set(true); 101 // might as well give up 102 failedAt.set(newPrev); 103 return; 104 } 105 } 106 } 107 }; 108 109 // writer thread parallelism. 110 int n = 20; 111 Thread[] writers = new Thread[n]; 112 AtomicBoolean[] statuses = new AtomicBoolean[n]; 113 Thread readThread = new Thread(reader); 114 115 for (int i = 0; i < n; ++i) { 116 statuses[i] = new AtomicBoolean(true); 117 writers[i] = new Thread(new Writer(finished, mvcc, statuses[i])); 118 writers[i].start(); 119 } 120 readThread.start(); 121 122 try { 123 Thread.sleep(10 * 1000); 124 } catch (InterruptedException ex) { 125 } 126 127 finished.set(true); 128 129 readThread.join(); 130 for (int i = 0; i < n; ++i) { 131 writers[i].join(); 132 } 133 134 // check failure. 135 Assert.assertFalse(readerFailed.get()); 136 for (int i = 0; i < n; ++i) { 137 Assert.assertTrue(statuses[i].get()); 138 } 139 } 140}