001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; 021import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY; 022import static org.junit.Assert.assertEquals; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.List; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.Future; 033import java.util.concurrent.ThreadLocalRandom; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicBoolean; 036import java.util.stream.Collectors; 037import java.util.stream.IntStream; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.MemoryCompactionPolicy; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 044import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 045import org.apache.hadoop.hbase.regionserver.HRegion; 046import org.apache.hadoop.hbase.testclassification.ClientTests; 047import org.apache.hadoop.hbase.testclassification.LargeTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.RetryCounter; 050import org.apache.hadoop.hbase.util.Threads; 051import org.junit.AfterClass; 052import org.junit.Assert; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 061import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 062 063/** 064 * Will split the table, and move region randomly when testing. 065 */ 066@Category({ LargeTests.class, ClientTests.class }) 067public class TestAsyncTableGetMultiThreaded { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestAsyncTableGetMultiThreaded.class); 072 073 private static final Logger LOG = LoggerFactory.getLogger(TestAsyncTableGetMultiThreaded.class); 074 075 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 076 077 private static final TableName TABLE_NAME = TableName.valueOf("async"); 078 private static final byte[] FAMILY = Bytes.toBytes("cf"); 079 private static final byte[] QUALIFIER = Bytes.toBytes("cq"); 080 private static final int COUNT = 1000; 081 082 private static AsyncConnection CONN; 083 084 private static AsyncTable<?> TABLE; 085 086 private static byte[][] SPLIT_KEYS; 087 088 @BeforeClass 089 public static void setUp() throws Exception { 090 setUp(MemoryCompactionPolicy.NONE); 091 } 092 093 protected static void setUp(MemoryCompactionPolicy memoryCompaction) throws Exception { 094 TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L); 095 TEST_UTIL.getConfiguration().setInt(MAX_BUFFER_COUNT_KEY, 100); 096 TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 097 String.valueOf(memoryCompaction)); 098 TEST_UTIL.getConfiguration().setBoolean("hbase.master.balancer.decision.buffer.enabled", true); 099 100 TEST_UTIL.startMiniCluster(3); 101 SPLIT_KEYS = new byte[8][]; 102 for (int i = 111; i < 999; i += 111) { 103 SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); 104 } 105 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 106 TEST_UTIL.waitTableAvailable(TABLE_NAME); 107 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 108 TABLE = CONN.getTableBuilder(TABLE_NAME).setReadRpcTimeout(1, TimeUnit.SECONDS) 109 .setMaxRetries(1000).build(); 110 TABLE.putAll( 111 IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) 112 .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))).collect(Collectors.toList())) 113 .get(); 114 } 115 116 @AfterClass 117 public static void tearDown() throws Exception { 118 Closeables.close(CONN, true); 119 TEST_UTIL.shutdownMiniCluster(); 120 } 121 122 private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException { 123 while (!stop.get()) { 124 for (int i = 0; i < COUNT; i++) { 125 assertEquals(i, Bytes.toInt(TABLE.get(new Get(Bytes.toBytes(String.format("%03d", i)))) 126 .get().getValue(FAMILY, QUALIFIER))); 127 } 128 // sleep a bit so we do not add to much load to the test machine as we have 20 threads here 129 Thread.sleep(10); 130 } 131 } 132 133 @Test 134 public void test() throws Exception { 135 LOG.info("====== Test started ======"); 136 int numThreads = 7; 137 AtomicBoolean stop = new AtomicBoolean(false); 138 ExecutorService executor = Executors.newFixedThreadPool(numThreads, 139 new ThreadFactoryBuilder().setNameFormat("TestAsyncGet-pool-%d").setDaemon(true) 140 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 141 List<Future<?>> futures = new ArrayList<>(); 142 IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> { 143 run(stop); 144 return null; 145 }))); 146 LOG.info("====== Scheduled {} read threads ======", numThreads); 147 Collections.shuffle(Arrays.asList(SPLIT_KEYS), ThreadLocalRandom.current()); 148 Admin admin = TEST_UTIL.getAdmin(); 149 for (byte[] splitPoint : SPLIT_KEYS) { 150 int oldRegionCount = admin.getRegions(TABLE_NAME).size(); 151 LOG.info("====== Splitting at {} ======, region count before splitting is {}", 152 Bytes.toStringBinary(splitPoint), oldRegionCount); 153 admin.split(TABLE_NAME, splitPoint); 154 TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 155 @Override 156 public boolean evaluate() throws Exception { 157 return TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).size() > oldRegionCount; 158 } 159 160 @Override 161 public String explainFailure() throws Exception { 162 return "Split has not finished yet"; 163 } 164 }); 165 List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME); 166 LOG.info("====== Split at {} ======, region count after splitting is {}", 167 Bytes.toStringBinary(splitPoint), regions.size()); 168 for (HRegion region : regions) { 169 LOG.info("====== Compact {} ======", region.getRegionInfo()); 170 region.compact(true); 171 } 172 for (HRegion region : regions) { 173 // Waiting for compaction to complete and references are cleaned up 174 LOG.info("====== Waiting for compaction on {} ======", region.getRegionInfo()); 175 RetryCounter retrier = new RetryCounter(30, 1, TimeUnit.SECONDS); 176 for (;;) { 177 try { 178 if ( 179 admin.getCompactionStateForRegion(region.getRegionInfo().getRegionName()) 180 == CompactionState.NONE 181 ) { 182 break; 183 } 184 } catch (IOException e) { 185 LOG.warn("Failed to query"); 186 } 187 if (!retrier.shouldRetry()) { 188 throw new IOException("Can not finish compaction in time after attempt " 189 + retrier.getAttemptTimes() + " times"); 190 } 191 retrier.sleepUntilNextRetry(); 192 } 193 LOG.info("====== Compaction on {} finished, close and archive compacted files ======", 194 region.getRegionInfo()); 195 region.getStores().get(0).closeAndArchiveCompactedFiles(); 196 LOG.info("====== Close and archive compacted files on {} done ======", 197 region.getRegionInfo()); 198 } 199 Thread.sleep(5000); 200 LOG.info("====== Balancing cluster ======"); 201 admin.balance(BalanceRequest.newBuilder().setIgnoreRegionsInTransition(true).build()); 202 LOG.info("====== Balance cluster done ======"); 203 Thread.sleep(5000); 204 ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 205 ServerName newMetaServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() 206 .map(t -> t.getRegionServer().getServerName()).filter(s -> !s.equals(metaServer)).findAny() 207 .get(); 208 LOG.info("====== Moving meta from {} to {} ======", metaServer, newMetaServer); 209 admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), newMetaServer); 210 LOG.info("====== Move meta done ======"); 211 Thread.sleep(5000); 212 } 213 List<LogEntry> balancerDecisionRecords = 214 admin.getLogEntries(null, "BALANCER_DECISION", ServerType.MASTER, 2, null); 215 Assert.assertEquals(balancerDecisionRecords.size(), 2); 216 LOG.info("====== Read test finished, shutdown thread pool ======"); 217 stop.set(true); 218 executor.shutdown(); 219 for (int i = 0; i < numThreads; i++) { 220 LOG.info("====== Waiting for {} threads to finish, remaining {} ======", numThreads, 221 numThreads - i); 222 futures.get(i).get(); 223 } 224 LOG.info("====== Test test finished ======"); 225 } 226}