001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.test; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.concurrent.Executors; 023import java.util.concurrent.ScheduledExecutorService; 024import java.util.concurrent.ScheduledFuture; 025import java.util.concurrent.ThreadLocalRandom; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicLong; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HRegionLocation; 031import org.apache.hadoop.hbase.IntegrationTestIngest; 032import org.apache.hadoop.hbase.IntegrationTestingUtility; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.Consistency; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.client.RegionLocator; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; 042import org.apache.hadoop.hbase.testclassification.IntegrationTests; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.apache.hadoop.hbase.util.LoadTestTool; 045import org.apache.hadoop.hbase.util.MultiThreadedReader; 046import org.apache.hadoop.hbase.util.Threads; 047import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 048import org.apache.hadoop.util.StringUtils; 049import org.apache.hadoop.util.ToolRunner; 050import org.junit.Assert; 051import org.junit.experimental.categories.Category; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 056 057/** 058 * An IntegrationTest for doing reads with a timeout, to a read-only table with region replicas. 059 * ChaosMonkey is run which kills the region servers and master, but ensures that meta region server 060 * is not killed, and at most 2 region servers are dead at any point in time. The expected behavior 061 * is that all reads with stale mode true will return before the timeout (5 sec by default). The 062 * test fails if the read requests does not finish in time. 063 * <p> 064 * This test uses LoadTestTool to read and write the data from a single client but multiple threads. 065 * The data is written first, then we allow the region replicas to catch up. Then we start the 066 * reader threads doing get requests with stale mode true. Chaos Monkey is started after some delay 067 * (20 sec by default) after the reader threads are started so that there is enough time to fully 068 * cache meta. These parameters (and some other parameters from LoadTestTool) can be used to control 069 * behavior, given values are default: 070 * 071 * <pre> 072 * -Dhbase.IntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000 073 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5 074 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.get_timeout_ms=5000 075 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_keys_per_server=2500 076 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.region_replication=3 077 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=20 078 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=20 079 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5 080 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.chaos_monkey_delay=20000 081 * </pre> 082 * 083 * Use this test with "serverKilling" ChaosMonkey. Sample usage: 084 * 085 * <pre> 086 * hbase org.apache.hadoop.hbase.test.IntegrationTestTimeBoundedRequestsWithRegionReplicas 087 * -Dhbase.IntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000 088 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=40 089 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=40 090 * -Dhbase.ipc.client.allowsInterrupt=true --monkey serverKilling 091 * </pre> 092 */ 093@Category(IntegrationTests.class) 094public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends IntegrationTestIngest { 095 096 private static final Logger LOG = 097 LoggerFactory.getLogger(IntegrationTestTimeBoundedRequestsWithRegionReplicas.class); 098 099 private static final String TEST_NAME = 100 IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName(); 101 102 protected static final long DEFAULT_GET_TIMEOUT = 5000; // 5 sec 103 protected static final String GET_TIMEOUT_KEY = "get_timeout_ms"; 104 105 protected static final long DEFAUL_CHAOS_MONKEY_DELAY = 20 * 1000; // 20 sec 106 protected static final String CHAOS_MONKEY_DELAY_KEY = "chaos_monkey_delay"; 107 108 protected static final int DEFAULT_REGION_REPLICATION = 3; 109 110 @Override 111 protected void startMonkey() throws Exception { 112 // we do not want to start the monkey at the start of the test. 113 } 114 115 @Override 116 protected MonkeyFactory getDefaultMonkeyFactory() { 117 return MonkeyFactory.getFactory(MonkeyFactory.CALM); 118 } 119 120 @Override 121 public void setConf(Configuration conf) { 122 super.setConf(conf); 123 // default replication for this test is 3 124 String clazz = this.getClass().getSimpleName(); 125 conf.setIfUnset(String.format("%s.%s", clazz, LoadTestTool.OPT_REGION_REPLICATION), 126 Integer.toString(DEFAULT_REGION_REPLICATION)); 127 } 128 129 protected void writeData(int colsPerKey, int recordSize, int writeThreads, long startKey, 130 long numKeys) throws IOException { 131 int ret = loadTool.run(getArgsForLoadTestTool("-write", 132 String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys)); 133 if (0 != ret) { 134 String errorMsg = "Load failed with error code " + ret; 135 LOG.error(errorMsg); 136 Assert.fail(errorMsg); 137 } 138 } 139 140 @Override 141 protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey, 142 int recordSize, int writeThreads, int readThreads) throws Exception { 143 LOG.info("Cluster size:" 144 + util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size()); 145 146 long start = EnvironmentEdgeManager.currentTime(); 147 String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName()); 148 long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime); 149 long startKey = 0; 150 151 long numKeys = getNumKeys(keysPerServerPerIter); 152 153 // write data once 154 LOG.info("Writing some data to the table"); 155 writeData(colsPerKey, recordSize, writeThreads, startKey, numKeys); 156 157 // flush the table 158 LOG.info("Flushing the table"); 159 Admin admin = util.getAdmin(); 160 admin.flush(getTablename()); 161 162 // re-open the regions to make sure that the replicas are up to date 163 long refreshTime = 164 conf.getLong(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0); 165 if (refreshTime > 0 && refreshTime <= 10000) { 166 LOG.info("Sleeping " + refreshTime + "ms to ensure that the data is replicated"); 167 Threads.sleep(refreshTime * 3); 168 } else { 169 LOG.info("Reopening the table"); 170 admin.disableTable(getTablename()); 171 admin.enableTable(getTablename()); 172 } 173 174 // We should only start the ChaosMonkey after the readers are started and have cached 175 // all of the region locations. Because the meta is not replicated, the timebounded reads 176 // will timeout if meta server is killed. 177 // We will start the chaos monkey after 1 minute, and since the readers are reading random 178 // keys, it should be enough to cache every region entry. 179 long chaosMonkeyDelay = conf.getLong(String.format("%s.%s", TEST_NAME, CHAOS_MONKEY_DELAY_KEY), 180 DEFAUL_CHAOS_MONKEY_DELAY); 181 ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); 182 LOG.info( 183 String.format("ChaosMonkey delay is : %d seconds. Will start %s " + "ChaosMonkey after delay", 184 chaosMonkeyDelay / 1000, monkeyToUse)); 185 ScheduledFuture<?> result = executorService.schedule(new Runnable() { 186 @Override 187 public void run() { 188 try { 189 LOG.info("Starting ChaosMonkey"); 190 monkey.start(); 191 monkey.waitForStop(); 192 } catch (Exception e) { 193 LOG.warn(StringUtils.stringifyException(e)); 194 } 195 196 } 197 }, chaosMonkeyDelay, TimeUnit.MILLISECONDS); 198 199 // set the intended run time for the reader. The reader will do read requests 200 // to random keys for this amount of time. 201 long remainingTime = runtime - (EnvironmentEdgeManager.currentTime() - start); 202 if (remainingTime <= 0) { 203 LOG.error("The amount of time left for the test to perform random reads is " 204 + "non-positive. Increase the test execution time via " 205 + String.format(RUN_TIME_KEY, 206 IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName()) 207 + " or reduce the amount of data written per server via " 208 + IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName() + "." 209 + IntegrationTestIngest.NUM_KEYS_PER_SERVER_KEY); 210 throw new IllegalArgumentException("No time remains to execute random reads"); 211 } 212 LOG.info("Reading random keys from the table for " + remainingTime / 60000 + " min"); 213 this.conf.setLong( 214 String.format(RUN_TIME_KEY, TimeBoundedMultiThreadedReader.class.getSimpleName()), 215 remainingTime); // load tool shares the same conf 216 217 // now start the readers which will run for configured run time 218 try { 219 int ret = loadTool.run( 220 getArgsForLoadTestTool("-read", String.format("100:%d", readThreads), startKey, numKeys)); 221 if (0 != ret) { 222 String errorMsg = "Verification failed with error code " + ret; 223 LOG.error(errorMsg); 224 Assert.fail(errorMsg); 225 } 226 } finally { 227 if (result != null) result.cancel(false); 228 monkey.stop("Stopping the test"); 229 monkey.waitForStop(); 230 executorService.shutdown(); 231 } 232 } 233 234 @Override 235 protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey, 236 long numKeys) { 237 List<String> args = 238 Lists.newArrayList(super.getArgsForLoadTestTool(mode, modeSpecificArg, startKey, numKeys)); 239 args.add("-reader"); 240 args.add(TimeBoundedMultiThreadedReader.class.getName()); 241 return args.toArray(new String[args.size()]); 242 } 243 244 public static class TimeBoundedMultiThreadedReader extends MultiThreadedReader { 245 protected long timeoutNano; 246 protected AtomicLong timedOutReads = new AtomicLong(); 247 protected long runTime; 248 protected Thread timeoutThread; 249 protected AtomicLong staleReads = new AtomicLong(); 250 251 public TimeBoundedMultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, 252 TableName tableName, double verifyPercent) throws IOException { 253 super(dataGen, conf, tableName, verifyPercent); 254 long timeoutMs = 255 conf.getLong(String.format("%s.%s", TEST_NAME, GET_TIMEOUT_KEY), DEFAULT_GET_TIMEOUT); 256 timeoutNano = timeoutMs * 1000000; 257 LOG.info("Timeout for gets: " + timeoutMs); 258 String runTimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName()); 259 this.runTime = conf.getLong(runTimeKey, -1); 260 if (this.runTime <= 0) { 261 throw new IllegalArgumentException("Please configure " + runTimeKey); 262 } 263 } 264 265 @Override 266 public void waitForFinish() { 267 try { 268 this.timeoutThread.join(); 269 } catch (InterruptedException e) { 270 // Restore interrupt status 271 Thread.currentThread().interrupt(); 272 } 273 this.aborted = true; 274 super.waitForFinish(); 275 } 276 277 @Override 278 protected String progressInfo() { 279 StringBuilder builder = new StringBuilder(super.progressInfo()); 280 appendToStatus(builder, "stale_reads", staleReads.get()); 281 appendToStatus(builder, "get_timeouts", timedOutReads.get()); 282 return builder.toString(); 283 } 284 285 @Override 286 public void start(long startKey, long endKey, int numThreads) throws IOException { 287 super.start(startKey, endKey, numThreads); 288 this.timeoutThread = new TimeoutThread(this.runTime); 289 this.timeoutThread.start(); 290 } 291 292 @Override 293 protected HBaseReaderThread createReaderThread(int readerId) throws IOException { 294 return new TimeBoundedMultiThreadedReaderThread(readerId); 295 } 296 297 private static class TimeoutThread extends Thread { 298 long timeout; 299 long reportInterval = 60000; 300 301 public TimeoutThread(long timeout) { 302 this.timeout = timeout; 303 } 304 305 @Override 306 public void run() { 307 while (true) { 308 long rem = Math.min(timeout, reportInterval); 309 if (rem <= 0) { 310 break; 311 } 312 LOG.info("Remaining execution time:" + timeout / 60000 + " min"); 313 Threads.sleep(rem); 314 timeout -= rem; 315 } 316 } 317 } 318 319 public class TimeBoundedMultiThreadedReaderThread 320 extends MultiThreadedReader.HBaseReaderThread { 321 322 public TimeBoundedMultiThreadedReaderThread(int readerId) throws IOException { 323 super(readerId); 324 } 325 326 @Override 327 protected Get createGet(long keyToRead) throws IOException { 328 Get get = super.createGet(keyToRead); 329 get.setConsistency(Consistency.TIMELINE); 330 return get; 331 } 332 333 @Override 334 protected long getNextKeyToRead() { 335 // always read a random key, assuming that the writer has finished writing all keys 336 long key = 337 startKey + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE) % (endKey - startKey); 338 return key; 339 } 340 341 @Override 342 protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano, 343 Result[] results, Table table, boolean isNullExpected) throws IOException { 344 super.verifyResultsAndUpdateMetrics(verify, gets, elapsedNano, results, table, 345 isNullExpected); 346 for (Result r : results) { 347 if (r.isStale()) staleReads.incrementAndGet(); 348 } 349 // we actually do not timeout and cancel the reads after timeout. We just wait for the RPC 350 // to complete, but if the request took longer than timeout, we treat that as error. 351 if (elapsedNano > timeoutNano) { 352 timedOutReads.incrementAndGet(); 353 numReadFailures.addAndGet(1); // fail the test 354 for (Result r : results) { 355 LOG.error("FAILED FOR " + r); 356 List<HRegionLocation> locs; 357 try (RegionLocator locator = connection.getRegionLocator(tableName)) { 358 locs = locator.getRegionLocations(r.getRow()); 359 } 360 for (HRegionLocation h : locs) { 361 LOG.error("LOCATION " + h); 362 } 363 } 364 } 365 } 366 } 367 } 368 369 public static void main(String[] args) throws Exception { 370 Configuration conf = HBaseConfiguration.create(); 371 IntegrationTestingUtility.setUseDistributedCluster(conf); 372 int ret = 373 ToolRunner.run(conf, new IntegrationTestTimeBoundedRequestsWithRegionReplicas(), args); 374 System.exit(ret); 375 } 376}