001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.CellUtil; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.HConstants.OperationStatusCode; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 033import org.apache.hadoop.hbase.client.Get; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.client.RegionInfoBuilder; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.TableDescriptor; 039import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 040import org.apache.hadoop.hbase.testclassification.MediumTests; 041import org.apache.hadoop.hbase.testclassification.RegionServerTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 044import org.junit.After; 045import org.junit.Before; 046import org.junit.BeforeClass; 047import org.junit.ClassRule; 048import org.junit.Rule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.junit.rules.TestName; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * Testing of multiPut in parallel. 057 */ 058@Category({ RegionServerTests.class, MediumTests.class }) 059public class TestParallelPut { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestParallelPut.class); 064 065 private static final Logger LOG = LoggerFactory.getLogger(TestParallelPut.class); 066 @Rule 067 public TestName name = new TestName(); 068 069 private HRegion region = null; 070 private static HBaseTestingUtil HBTU = new HBaseTestingUtil(); 071 private static final int THREADS100 = 100; 072 073 // Test names 074 static byte[] tableName; 075 static final byte[] qual1 = Bytes.toBytes("qual1"); 076 static final byte[] qual2 = Bytes.toBytes("qual2"); 077 static final byte[] qual3 = Bytes.toBytes("qual3"); 078 static final byte[] value1 = Bytes.toBytes("value1"); 079 static final byte[] value2 = Bytes.toBytes("value2"); 080 static final byte[] row = Bytes.toBytes("rowA"); 081 static final byte[] row2 = Bytes.toBytes("rowB"); 082 083 @BeforeClass 084 public static void beforeClass() { 085 // Make sure enough handlers. 086 HBTU.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREADS100); 087 } 088 089 /** 090 * @see org.apache.hadoop.hbase.HBaseTestCase#setUp() 091 */ 092 @Before 093 public void setUp() throws Exception { 094 tableName = Bytes.toBytes(name.getMethodName()); 095 } 096 097 @After 098 public void tearDown() throws Exception { 099 EnvironmentEdgeManagerTestHelper.reset(); 100 if (region != null) { 101 region.close(true); 102 } 103 } 104 105 public String getName() { 106 return name.getMethodName(); 107 } 108 109 ////////////////////////////////////////////////////////////////////////////// 110 // New tests that don't spin up a mini cluster but rather just test the 111 // individual code pieces in the HRegion. 112 ////////////////////////////////////////////////////////////////////////////// 113 114 /** 115 * Test one put command. 116 */ 117 @Test 118 public void testPut() throws IOException { 119 LOG.info("Starting testPut"); 120 this.region = initHRegion(tableName, getName(), fam1); 121 122 long value = 1L; 123 124 Put put = new Put(row); 125 put.addColumn(fam1, qual1, Bytes.toBytes(value)); 126 region.put(put); 127 128 assertGet(this.region, row, fam1, qual1, Bytes.toBytes(value)); 129 } 130 131 /** 132 * Test multi-threaded Puts. 133 */ 134 @Test 135 public void testParallelPuts() throws IOException { 136 137 LOG.info("Starting testParallelPuts"); 138 139 this.region = initHRegion(tableName, getName(), fam1); 140 int numOps = 1000; // these many operations per thread 141 142 // create 100 threads, each will do its own puts 143 Putter[] all = new Putter[THREADS100]; 144 145 // create all threads 146 for (int i = 0; i < THREADS100; i++) { 147 all[i] = new Putter(region, i, numOps); 148 } 149 150 // run all threads 151 for (int i = 0; i < THREADS100; i++) { 152 all[i].start(); 153 } 154 155 // wait for all threads to finish 156 for (int i = 0; i < THREADS100; i++) { 157 try { 158 all[i].join(); 159 } catch (InterruptedException e) { 160 LOG.warn("testParallelPuts encountered InterruptedException." + " Ignoring....", e); 161 } 162 } 163 LOG 164 .info("testParallelPuts successfully verified " + (numOps * THREADS100) + " put operations."); 165 } 166 167 private static void assertGet(final HRegion region, byte[] row, byte[] familiy, byte[] qualifier, 168 byte[] value) throws IOException { 169 // run a get and see if the value matches 170 Get get = new Get(row); 171 get.addColumn(familiy, qualifier); 172 Result result = region.get(get); 173 assertEquals(1, result.size()); 174 175 Cell kv = result.rawCells()[0]; 176 byte[] r = CellUtil.cloneValue(kv); 177 assertTrue(Bytes.compareTo(r, value) == 0); 178 } 179 180 private HRegion initHRegion(byte[] tableName, String callingMethod, byte[]... families) 181 throws IOException { 182 TableDescriptorBuilder builder = 183 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); 184 for (byte[] family : families) { 185 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 186 } 187 TableDescriptor tableDescriptor = builder.build(); 188 RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(); 189 return HBTU.createLocalHRegion(info, tableDescriptor); 190 } 191 192 /** 193 * A thread that makes a few put calls 194 */ 195 public static class Putter extends Thread { 196 197 private final HRegion region; 198 private final int threadNumber; 199 private final int numOps; 200 byte[] rowkey = null; 201 202 public Putter(HRegion region, int threadNumber, int numOps) { 203 this.region = region; 204 this.threadNumber = threadNumber; 205 this.numOps = numOps; 206 this.rowkey = Bytes.toBytes((long) threadNumber); // unique rowid per thread 207 setDaemon(true); 208 } 209 210 @Override 211 public void run() { 212 byte[] value = new byte[100]; 213 Put[] in = new Put[1]; 214 215 // iterate for the specified number of operations 216 for (int i = 0; i < numOps; i++) { 217 // generate random bytes 218 Bytes.random(value); 219 220 // put the randombytes and verify that we can read it. This is one 221 // way of ensuring that rwcc manipulation in HRegion.put() is fine. 222 Put put = new Put(rowkey); 223 put.addColumn(fam1, qual1, value); 224 in[0] = put; 225 try { 226 OperationStatus[] ret = region.batchMutate(in); 227 assertEquals(1, ret.length); 228 assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode()); 229 assertGet(this.region, rowkey, fam1, qual1, value); 230 } catch (IOException e) { 231 assertTrue("Thread id " + threadNumber + " operation " + i + " failed.", false); 232 } 233 } 234 } 235 } 236}