001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertTrue; 021 022import java.util.concurrent.ThreadLocalRandom; 023import org.apache.hadoop.hbase.HBaseClassTestRule; 024import org.apache.hadoop.hbase.testclassification.MediumTests; 025import org.apache.hadoop.hbase.testclassification.RegionServerTests; 026import org.junit.ClassRule; 027import org.junit.Test; 028import org.junit.experimental.categories.Category; 029 030@Category({ RegionServerTests.class, MediumTests.class }) 031public class TestSyncTimeRangeTracker extends TestSimpleTimeRangeTracker { 032 033 @ClassRule 034 public static final HBaseClassTestRule CLASS_RULE = 035 HBaseClassTestRule.forClass(TestSyncTimeRangeTracker.class); 036 037 private static final int NUM_KEYS = 8000000; 038 private static final int NUM_OF_THREADS = 20; 039 040 @Override 041 protected TimeRangeTracker getTimeRangeTracker() { 042 return TimeRangeTracker.create(TimeRangeTracker.Type.SYNC); 043 } 044 045 @Override 046 protected TimeRangeTracker getTimeRangeTracker(long min, long max) { 047 return TimeRangeTracker.create(TimeRangeTracker.Type.SYNC, min, max); 048 } 049 050 /** 051 * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive at right range. 052 * Here we do ten threads each incrementing over 100k at an offset of the thread index; max is 10 053 * * 10k and min is 0. 054 */ 055 @Test 056 public void testArriveAtRightAnswer() throws InterruptedException { 057 final TimeRangeTracker trr = getTimeRangeTracker(); 058 final int threadCount = 10; 059 final int calls = 1000 * 1000; 060 Thread[] threads = new Thread[threadCount]; 061 for (int i = 0; i < threads.length; i++) { 062 Thread t = new Thread("" + i) { 063 @Override 064 public void run() { 065 int offset = Integer.parseInt(getName()); 066 boolean even = offset % 2 == 0; 067 if (even) { 068 for (int i = (offset * calls); i < calls; i++) { 069 trr.includeTimestamp(i); 070 } 071 } else { 072 int base = offset * calls; 073 for (int i = base + calls; i >= base; i--) { 074 trr.includeTimestamp(i); 075 } 076 } 077 } 078 }; 079 t.start(); 080 threads[i] = t; 081 } 082 for (int i = 0; i < threads.length; i++) { 083 threads[i].join(); 084 } 085 086 assertTrue(trr.getMax() == calls * threadCount); 087 assertTrue(trr.getMin() == 0); 088 } 089 090 static class RandomTestData { 091 private long[] keys = new long[NUM_KEYS]; 092 private long min = Long.MAX_VALUE; 093 private long max = 0; 094 095 public RandomTestData() { 096 if (ThreadLocalRandom.current().nextInt(NUM_OF_THREADS) % 2 == 0) { 097 for (int i = 0; i < NUM_KEYS; i++) { 098 keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS); 099 if (keys[i] < min) { 100 min = keys[i]; 101 } 102 if (keys[i] > max) { 103 max = keys[i]; 104 } 105 } 106 } else { 107 for (int i = NUM_KEYS - 1; i >= 0; i--) { 108 keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS); 109 if (keys[i] < min) { 110 min = keys[i]; 111 } 112 if (keys[i] > max) { 113 max = keys[i]; 114 } 115 } 116 } 117 } 118 119 public long getMax() { 120 return this.max; 121 } 122 123 public long getMin() { 124 return this.min; 125 } 126 } 127 128 static class TrtUpdateRunnable implements Runnable { 129 130 private TimeRangeTracker trt; 131 private RandomTestData data; 132 133 public TrtUpdateRunnable(final TimeRangeTracker trt, final RandomTestData data) { 134 this.trt = trt; 135 this.data = data; 136 } 137 138 @Override 139 public void run() { 140 for (long key : data.keys) { 141 trt.includeTimestamp(key); 142 } 143 } 144 } 145 146 /** 147 * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive at right range. 148 * The data chosen is going to ensure that there are lots collisions, i.e, some other threads may 149 * already update the value while one tries to update min/max value. 150 */ 151 @Test 152 public void testConcurrentIncludeTimestampCorrectness() { 153 RandomTestData[] testData = new RandomTestData[NUM_OF_THREADS]; 154 long min = Long.MAX_VALUE, max = 0; 155 for (int i = 0; i < NUM_OF_THREADS; i++) { 156 testData[i] = new RandomTestData(); 157 if (testData[i].getMin() < min) { 158 min = testData[i].getMin(); 159 } 160 if (testData[i].getMax() > max) { 161 max = testData[i].getMax(); 162 } 163 } 164 165 TimeRangeTracker trt = TimeRangeTracker.create(TimeRangeTracker.Type.SYNC); 166 167 Thread[] t = new Thread[NUM_OF_THREADS]; 168 for (int i = 0; i < NUM_OF_THREADS; i++) { 169 t[i] = new Thread(new TrtUpdateRunnable(trt, testData[i])); 170 t[i].start(); 171 } 172 173 for (Thread thread : t) { 174 try { 175 thread.join(); 176 } catch (InterruptedException e) { 177 e.printStackTrace(); 178 } 179 } 180 181 assertTrue(min == trt.getMin()); 182 assertTrue(max == trt.getMax()); 183 } 184}