001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.EnumSet; 026import java.util.List; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.ClusterMetrics.Option; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.TableNotFoundException; 034import org.apache.hadoop.hbase.client.Admin; 035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 037import org.apache.hadoop.hbase.client.TableDescriptor; 038import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 039import org.apache.hadoop.hbase.io.compress.Compression; 040import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 041import org.apache.hadoop.hbase.testclassification.MediumTests; 042import org.apache.hadoop.hbase.testclassification.MiscTests; 043import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 044import org.junit.After; 045import org.junit.Before; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.junit.runner.RunWith; 050import org.junit.runners.Parameterized; 051import org.junit.runners.Parameterized.Parameters; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * A write/read/verify load test on a mini HBase cluster. Tests reading and then writing. 057 */ 058@Category({ MiscTests.class, MediumTests.class }) 059@RunWith(Parameterized.class) 060public class TestMiniClusterLoadSequential { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestMiniClusterLoadSequential.class); 065 066 private static final Logger LOG = LoggerFactory.getLogger(TestMiniClusterLoadSequential.class); 067 068 protected static final TableName TABLE = TableName.valueOf("load_test_tbl"); 069 protected static final byte[] CF = Bytes.toBytes("load_test_cf"); 070 protected static final int NUM_THREADS = 8; 071 protected static final int NUM_RS = 2; 072 protected static final int TIMEOUT_MS = 180000; 073 protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 074 075 protected final Configuration conf = TEST_UTIL.getConfiguration(); 076 protected final boolean isMultiPut; 077 protected final DataBlockEncoding dataBlockEncoding; 078 079 protected MultiThreadedWriter writerThreads; 080 protected MultiThreadedReader readerThreads; 081 protected int numKeys; 082 083 protected Compression.Algorithm compression = Compression.Algorithm.NONE; 084 085 public TestMiniClusterLoadSequential(boolean isMultiPut, DataBlockEncoding dataBlockEncoding) { 086 this.isMultiPut = isMultiPut; 087 this.dataBlockEncoding = dataBlockEncoding; 088 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); 089 090 // We don't want any region reassignments by the load balancer during the test. 091 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, 10.0f); 092 } 093 094 @Parameters 095 public static Collection<Object[]> parameters() { 096 List<Object[]> parameters = new ArrayList<>(); 097 for (boolean multiPut : new boolean[] { false, true }) { 098 for (DataBlockEncoding dataBlockEncoding : new DataBlockEncoding[] { DataBlockEncoding.NONE, 099 DataBlockEncoding.PREFIX }) { 100 parameters.add(new Object[] { multiPut, dataBlockEncoding }); 101 } 102 } 103 return parameters; 104 } 105 106 @Before 107 public void setUp() throws Exception { 108 LOG.debug("Test setup: isMultiPut=" + isMultiPut); 109 TEST_UTIL.startMiniCluster(NUM_RS); 110 } 111 112 @After 113 public void tearDown() throws Exception { 114 LOG.debug("Test teardown: isMultiPut=" + isMultiPut); 115 TEST_UTIL.shutdownMiniCluster(); 116 } 117 118 protected MultiThreadedReader prepareReaderThreads(LoadTestDataGenerator dataGen, 119 Configuration conf, TableName tableName, double verifyPercent) throws IOException { 120 MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent); 121 return reader; 122 } 123 124 protected MultiThreadedWriter prepareWriterThreads(LoadTestDataGenerator dataGen, 125 Configuration conf, TableName tableName) throws IOException { 126 MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tableName); 127 writer.setMultiPut(isMultiPut); 128 return writer; 129 } 130 131 @Test 132 public void loadTest() throws Exception { 133 prepareForLoadTest(); 134 runLoadTestOnExistingTable(); 135 } 136 137 protected void runLoadTestOnExistingTable() throws IOException { 138 writerThreads.start(0, numKeys, NUM_THREADS); 139 writerThreads.waitForFinish(); 140 assertEquals(0, writerThreads.getNumWriteFailures()); 141 142 readerThreads.start(0, numKeys, NUM_THREADS); 143 readerThreads.waitForFinish(); 144 assertEquals(0, readerThreads.getNumReadFailures()); 145 assertEquals(0, readerThreads.getNumReadErrors()); 146 assertEquals(numKeys, readerThreads.getNumKeysVerified()); 147 } 148 149 protected void createPreSplitLoadTestTable(TableDescriptor tableDescriptor, 150 ColumnFamilyDescriptor familyDescriptor) throws IOException { 151 LoadTestUtil.createPreSplitLoadTestTable(conf, tableDescriptor, familyDescriptor); 152 TEST_UTIL.waitUntilAllRegionsAssigned(tableDescriptor.getTableName()); 153 } 154 155 protected void prepareForLoadTest() throws IOException { 156 LOG.info( 157 "Starting load test: dataBlockEncoding=" + dataBlockEncoding + ", isMultiPut=" + isMultiPut); 158 numKeys = numKeys(); 159 Admin admin = TEST_UTIL.getAdmin(); 160 while ( 161 admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().size() 162 < NUM_RS 163 ) { 164 LOG.info("Sleeping until " + NUM_RS + " RSs are online"); 165 Threads.sleepWithoutInterrupt(1000); 166 } 167 admin.close(); 168 169 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLE).build(); 170 ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(CF) 171 .setCompressionType(compression).setDataBlockEncoding(dataBlockEncoding).build(); 172 createPreSplitLoadTestTable(tableDescriptor, familyDescriptor); 173 174 LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF); 175 writerThreads = prepareWriterThreads(dataGen, conf, TABLE); 176 readerThreads = prepareReaderThreads(dataGen, conf, TABLE, 100); 177 } 178 179 protected int numKeys() { 180 return 1000; 181 } 182 183 protected ColumnFamilyDescriptor getColumnDesc(Admin admin) 184 throws TableNotFoundException, IOException { 185 return admin.getDescriptor(TABLE).getColumnFamily(CF); 186 } 187 188}