001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.ThreadLocalRandom;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.client.Durability;
033import org.apache.hadoop.hbase.client.Increment;
034import org.apache.hadoop.hbase.client.Scan;
035import org.apache.hadoop.hbase.client.TestIncrementsFromClientSide;
036import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.junit.After;
041import org.junit.Before;
042import org.junit.ClassRule;
043import org.junit.Rule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.junit.rules.TestName;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * Increments with some concurrency against a region to ensure we get the right answer. Test is
052 * parameterized to run the fast and slow path increments; if fast,
053 * HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY is true.
054 * <p>
055 * There is similar test up in TestAtomicOperation. It does a test where it has 100 threads doing
056 * increments across two column families all on one row and the increments are connected to prove
057 * atomicity on row.
058 */
059@Category(MediumTests.class)
060public class TestRegionIncrement {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestRegionIncrement.class);
065
066  private static final Logger LOG = LoggerFactory.getLogger(TestRegionIncrement.class);
067  @Rule
068  public TestName name = new TestName();
069  private static HBaseTestingUtility TEST_UTIL;
070  private final static byte[] INCREMENT_BYTES = Bytes.toBytes("increment");
071  private static final int THREAD_COUNT = 10;
072  private static final int INCREMENT_COUNT = 10000;
073
074  @Before
075  public void setUp() throws Exception {
076    TEST_UTIL = HBaseTestingUtility.createLocalHTU();
077  }
078
079  @After
080  public void tearDown() throws Exception {
081    TEST_UTIL.cleanupTestDir();
082  }
083
084  private HRegion getRegion(final Configuration conf, final String tableName) throws IOException {
085    FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(),
086      TEST_UTIL.getDataTestDir().toString(), conf);
087    wal.init();
088    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
089      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
090    return (HRegion) TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName),
091      HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, conf, false,
092      Durability.SKIP_WAL, wal, INCREMENT_BYTES);
093  }
094
095  private void closeRegion(final HRegion region) throws IOException {
096    region.close();
097    region.getWAL().close();
098  }
099
100  @Test
101  public void testMVCCCausingMisRead() throws IOException {
102    final HRegion region = getRegion(TEST_UTIL.getConfiguration(), this.name.getMethodName());
103    try {
104      // ADD TEST HERE!!
105    } finally {
106      closeRegion(region);
107    }
108  }
109
110  /**
111   * Increments a single cell a bunch of times.
112   */
113  private static class SingleCellIncrementer extends Thread {
114    private final int count;
115    private final HRegion region;
116    private final Increment increment;
117
118    SingleCellIncrementer(final int i, final int count, final HRegion region,
119      final Increment increment) {
120      super("" + i);
121      setDaemon(true);
122      this.count = count;
123      this.region = region;
124      this.increment = increment;
125    }
126
127    @Override
128    public void run() {
129      for (int i = 0; i < this.count; i++) {
130        try {
131          this.region.increment(this.increment);
132          // LOG.info(getName() + " " + i);
133        } catch (IOException e) {
134          throw new RuntimeException(e);
135        }
136      }
137    }
138  }
139
140  /**
141   * Increments a random row's Cell <code>count</code> times.
142   */
143  private static class CrossRowCellIncrementer extends Thread {
144    private final int count;
145    private final HRegion region;
146    private final Increment[] increments;
147
148    CrossRowCellIncrementer(final int i, final int count, final HRegion region, final int range) {
149      super("" + i);
150      setDaemon(true);
151      this.count = count;
152      this.region = region;
153      this.increments = new Increment[range];
154      for (int ii = 0; ii < range; ii++) {
155        this.increments[ii] = new Increment(Bytes.toBytes(i));
156        this.increments[ii].addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1);
157      }
158    }
159
160    @Override
161    public void run() {
162      for (int i = 0; i < this.count; i++) {
163        try {
164          int index = ThreadLocalRandom.current().nextInt(0, this.increments.length);
165          this.region.increment(this.increments[index]);
166          // LOG.info(getName() + " " + index);
167        } catch (IOException e) {
168          throw new RuntimeException(e);
169        }
170      }
171    }
172  }
173
174  /**
175   * Have each thread update its own Cell. Avoid contention with another thread.
176   */
177  @Test
178  public void testUnContendedSingleCellIncrement() throws IOException, InterruptedException {
179    final HRegion region = getRegion(TEST_UTIL.getConfiguration(),
180      TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName()));
181    long startTime = EnvironmentEdgeManager.currentTime();
182    try {
183      SingleCellIncrementer[] threads = new SingleCellIncrementer[THREAD_COUNT];
184      for (int i = 0; i < threads.length; i++) {
185        byte[] rowBytes = Bytes.toBytes(i);
186        Increment increment = new Increment(rowBytes);
187        increment.addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1);
188        threads[i] = new SingleCellIncrementer(i, INCREMENT_COUNT, region, increment);
189      }
190      for (int i = 0; i < threads.length; i++) {
191        threads[i].start();
192      }
193      for (int i = 0; i < threads.length; i++) {
194        threads[i].join();
195      }
196      RegionScanner regionScanner = region.getScanner(new Scan());
197      List<Cell> cells = new ArrayList<>(THREAD_COUNT);
198      while (regionScanner.next(cells))
199        continue;
200      assertEquals(THREAD_COUNT, cells.size());
201      long total = 0;
202      for (Cell cell : cells)
203        total += Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
204      assertEquals(INCREMENT_COUNT * THREAD_COUNT, total);
205    } finally {
206      closeRegion(region);
207      LOG.info(this.name.getMethodName() + " " + (EnvironmentEdgeManager.currentTime() - startTime)
208        + "ms");
209    }
210  }
211
212  /**
213   * Have each thread update its own Cell. Avoid contention with another thread. This is
214   */
215  @Test
216  public void testContendedAcrossCellsIncrement() throws IOException, InterruptedException {
217    final HRegion region = getRegion(TEST_UTIL.getConfiguration(),
218      TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName()));
219    long startTime = EnvironmentEdgeManager.currentTime();
220    try {
221      CrossRowCellIncrementer[] threads = new CrossRowCellIncrementer[THREAD_COUNT];
222      for (int i = 0; i < threads.length; i++) {
223        threads[i] = new CrossRowCellIncrementer(i, INCREMENT_COUNT, region, THREAD_COUNT);
224      }
225      for (int i = 0; i < threads.length; i++) {
226        threads[i].start();
227      }
228      for (int i = 0; i < threads.length; i++) {
229        threads[i].join();
230      }
231      RegionScanner regionScanner = region.getScanner(new Scan());
232      List<Cell> cells = new ArrayList<>(100);
233      while (regionScanner.next(cells))
234        continue;
235      assertEquals(THREAD_COUNT, cells.size());
236      long total = 0;
237      for (Cell cell : cells)
238        total += Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
239      assertEquals(INCREMENT_COUNT * THREAD_COUNT, total);
240    } finally {
241      closeRegion(region);
242      LOG.info(this.name.getMethodName() + " " + (EnvironmentEdgeManager.currentTime() - startTime)
243        + "ms");
244    }
245  }
246}