001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.core.Is.is; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.List; 030import java.util.Map; 031import java.util.Optional; 032import java.util.concurrent.atomic.AtomicLong; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HColumnDescriptor; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.HTableDescriptor; 043import org.apache.hadoop.hbase.KeyValue; 044import org.apache.hadoop.hbase.KeyValueUtil; 045import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; 046import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; 047import org.apache.hadoop.hbase.StartMiniClusterOption; 048import org.apache.hadoop.hbase.TableExistsException; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.ClientServiceCallable; 051import org.apache.hadoop.hbase.client.ClusterConnection; 052import org.apache.hadoop.hbase.client.RegionInfo; 053import org.apache.hadoop.hbase.client.Result; 054import org.apache.hadoop.hbase.client.ResultScanner; 055import org.apache.hadoop.hbase.client.RpcRetryingCaller; 056import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 057import org.apache.hadoop.hbase.client.Scan; 058import org.apache.hadoop.hbase.client.SecureBulkLoadClient; 059import org.apache.hadoop.hbase.client.Table; 060import org.apache.hadoop.hbase.coprocessor.ObserverContext; 061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 062import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 063import org.apache.hadoop.hbase.coprocessor.RegionObserver; 064import org.apache.hadoop.hbase.io.compress.Compression; 065import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 066import org.apache.hadoop.hbase.io.hfile.CacheConfig; 067import org.apache.hadoop.hbase.io.hfile.HFile; 068import org.apache.hadoop.hbase.io.hfile.HFileContext; 069import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 070import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 071import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 072import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 073import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener; 074import org.apache.hadoop.hbase.testclassification.LargeTests; 075import org.apache.hadoop.hbase.testclassification.RegionServerTests; 076import org.apache.hadoop.hbase.util.Bytes; 077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 078import org.apache.hadoop.hbase.util.Pair; 079import org.apache.hadoop.hbase.wal.WAL; 080import org.apache.hadoop.hbase.wal.WALEdit; 081import org.apache.hadoop.hbase.wal.WALKey; 082import org.junit.BeforeClass; 083import org.junit.ClassRule; 084import org.junit.Test; 085import org.junit.experimental.categories.Category; 086import org.junit.runner.RunWith; 087import org.junit.runners.Parameterized; 088import org.junit.runners.Parameterized.Parameters; 089import org.slf4j.Logger; 090import org.slf4j.LoggerFactory; 091 092import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 093 094import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 095import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 096import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 097 098/** 099 * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of the region server's 100 * bullkLoad functionality. 101 */ 102@RunWith(Parameterized.class) 103@Category({ RegionServerTests.class, LargeTests.class }) 104public class TestHRegionServerBulkLoad { 105 106 @ClassRule 107 public static final HBaseClassTestRule CLASS_RULE = 108 HBaseClassTestRule.forClass(TestHRegionServerBulkLoad.class); 109 110 private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class); 111 protected static HBaseTestingUtility UTIL = new HBaseTestingUtility(); 112 protected final static Configuration conf = UTIL.getConfiguration(); 113 protected final static byte[] QUAL = Bytes.toBytes("qual"); 114 protected final static int NUM_CFS = 10; 115 private int sleepDuration; 116 public static int BLOCKSIZE = 64 * 1024; 117 public static Algorithm COMPRESSION = Compression.Algorithm.NONE; 118 119 protected final static byte[][] families = new byte[NUM_CFS][]; 120 static { 121 for (int i = 0; i < NUM_CFS; i++) { 122 families[i] = Bytes.toBytes(family(i)); 123 } 124 } 125 126 @Parameters 127 public static final Collection<Object[]> parameters() { 128 int[] sleepDurations = new int[] { 0, 30000 }; 129 List<Object[]> configurations = new ArrayList<>(); 130 for (int i : sleepDurations) { 131 configurations.add(new Object[] { i }); 132 } 133 return configurations; 134 } 135 136 public TestHRegionServerBulkLoad(int duration) { 137 this.sleepDuration = duration; 138 } 139 140 @BeforeClass 141 public static void setUpBeforeClass() throws Exception { 142 conf.setInt("hbase.rpc.timeout", 10 * 1000); 143 } 144 145 /** 146 * Create a rowkey compatible with 147 * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}. 148 */ 149 public static byte[] rowkey(int i) { 150 return Bytes.toBytes(String.format("row_%08d", i)); 151 } 152 153 static String family(int i) { 154 return String.format("family_%04d", i); 155 } 156 157 /** 158 * Create an HFile with the given number of rows with a specified value. 159 */ 160 public static void createHFile(FileSystem fs, Path path, byte[] family, byte[] qualifier, 161 byte[] value, int numRows) throws IOException { 162 HFileContext context = 163 new HFileContextBuilder().withBlockSize(BLOCKSIZE).withCompression(COMPRESSION).build(); 164 HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path) 165 .withFileContext(context).create(); 166 long now = EnvironmentEdgeManager.currentTime(); 167 try { 168 // subtract 2 since iterateOnSplits doesn't include boundary keys 169 for (int i = 0; i < numRows; i++) { 170 KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value); 171 writer.append(kv); 172 } 173 writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(now)); 174 } finally { 175 writer.close(); 176 } 177 } 178 179 /** 180 * Thread that does full scans of the table looking for any partially completed rows. Each 181 * iteration of this loads 10 hdfs files, which occupies 5 file open file handles. So every 10 182 * iterations (500 file handles) it does a region compaction to reduce the number of open file 183 * handles. 184 */ 185 public static class AtomicHFileLoader extends RepeatingTestThread { 186 final AtomicLong numBulkLoads = new AtomicLong(); 187 final AtomicLong numCompactions = new AtomicLong(); 188 private TableName tableName; 189 190 public AtomicHFileLoader(TableName tableName, TestContext ctx, byte targetFamilies[][]) 191 throws IOException { 192 super(ctx); 193 this.tableName = tableName; 194 } 195 196 @Override 197 public void doAnAction() throws Exception { 198 long iteration = numBulkLoads.getAndIncrement(); 199 Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", iteration)); 200 201 // create HFiles for different column families 202 FileSystem fs = UTIL.getTestFileSystem(); 203 byte[] val = Bytes.toBytes(String.format("%010d", iteration)); 204 final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS); 205 for (int i = 0; i < NUM_CFS; i++) { 206 Path hfile = new Path(dir, family(i)); 207 byte[] fam = Bytes.toBytes(family(i)); 208 createHFile(fs, hfile, fam, QUAL, val, 1000); 209 famPaths.add(new Pair<>(fam, hfile.toString())); 210 } 211 212 // bulk load HFiles 213 final ClusterConnection conn = (ClusterConnection) UTIL.getConnection(); 214 Table table = conn.getTable(tableName); 215 final String bulkToken = 216 new SecureBulkLoadClient(UTIL.getConfiguration(), table).prepareBulkLoad(conn); 217 ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, tableName, 218 Bytes.toBytes("aaa"), new RpcControllerFactory(UTIL.getConfiguration()).newController(), 219 HConstants.PRIORITY_UNSET, Collections.emptyMap()) { 220 @Override 221 public Void rpcCall() throws Exception { 222 LOG.debug("Going to connect to server " + getLocation() + " for row " 223 + Bytes.toStringBinary(getRow())); 224 SecureBulkLoadClient secureClient = null; 225 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 226 try (Table table = conn.getTable(getTableName())) { 227 secureClient = new SecureBulkLoadClient(UTIL.getConfiguration(), table); 228 secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, true, null, 229 bulkToken); 230 } 231 return null; 232 } 233 }; 234 RpcRetryingCallerFactory factory = 235 new RpcRetryingCallerFactory(conf, conn.getConnectionConfiguration()); 236 RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); 237 caller.callWithRetries(callable, Integer.MAX_VALUE); 238 239 // Periodically do compaction to reduce the number of open file handles. 240 if (numBulkLoads.get() % 5 == 0) { 241 // 5 * 50 = 250 open file handles! 242 callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"), 243 new RpcControllerFactory(UTIL.getConfiguration()).newController(), 244 HConstants.PRIORITY_UNSET, Collections.emptyMap()) { 245 @Override 246 protected Void rpcCall() throws Exception { 247 LOG.debug("compacting " + getLocation() + " for row " + Bytes.toStringBinary(getRow())); 248 AdminProtos.AdminService.BlockingInterface server = 249 conn.getAdmin(getLocation().getServerName()); 250 CompactRegionRequest request = RequestConverter 251 .buildCompactRegionRequest(getLocation().getRegionInfo().getRegionName(), true, null); 252 server.compactRegion(null, request); 253 numCompactions.incrementAndGet(); 254 return null; 255 } 256 }; 257 caller.callWithRetries(callable, Integer.MAX_VALUE); 258 } 259 } 260 } 261 262 public static class MyObserver implements RegionCoprocessor, RegionObserver { 263 static int sleepDuration; 264 265 @Override 266 public Optional<RegionObserver> getRegionObserver() { 267 return Optional.of(this); 268 } 269 270 @Override 271 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, 272 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 273 CompactionRequest request) throws IOException { 274 try { 275 Thread.sleep(sleepDuration); 276 } catch (InterruptedException ie) { 277 IOException ioe = new InterruptedIOException(); 278 ioe.initCause(ie); 279 throw ioe; 280 } 281 return scanner; 282 } 283 } 284 285 /** 286 * Thread that does full scans of the table looking for any partially completed rows. 287 */ 288 public static class AtomicScanReader extends RepeatingTestThread { 289 byte targetFamilies[][]; 290 Table table; 291 AtomicLong numScans = new AtomicLong(); 292 AtomicLong numRowsScanned = new AtomicLong(); 293 TableName TABLE_NAME; 294 295 public AtomicScanReader(TableName TABLE_NAME, TestContext ctx, byte targetFamilies[][]) 296 throws IOException { 297 super(ctx); 298 this.TABLE_NAME = TABLE_NAME; 299 this.targetFamilies = targetFamilies; 300 table = UTIL.getConnection().getTable(TABLE_NAME); 301 } 302 303 @Override 304 public void doAnAction() throws Exception { 305 Scan s = new Scan(); 306 for (byte[] family : targetFamilies) { 307 s.addFamily(family); 308 } 309 ResultScanner scanner = table.getScanner(s); 310 311 for (Result res : scanner) { 312 byte[] lastRow = null, lastFam = null, lastQual = null; 313 byte[] gotValue = null; 314 for (byte[] family : targetFamilies) { 315 byte qualifier[] = QUAL; 316 byte thisValue[] = res.getValue(family, qualifier); 317 if (gotValue != null && thisValue != null && !Bytes.equals(gotValue, thisValue)) { 318 319 StringBuilder msg = new StringBuilder(); 320 msg.append("Failed on scan ").append(numScans).append(" after scanning ") 321 .append(numRowsScanned).append(" rows!\n"); 322 msg.append("Current was " + Bytes.toString(res.getRow()) + "/" + Bytes.toString(family) 323 + ":" + Bytes.toString(qualifier) + " = " + Bytes.toString(thisValue) + "\n"); 324 msg.append("Previous was " + Bytes.toString(lastRow) + "/" + Bytes.toString(lastFam) 325 + ":" + Bytes.toString(lastQual) + " = " + Bytes.toString(gotValue)); 326 throw new RuntimeException(msg.toString()); 327 } 328 329 lastFam = family; 330 lastQual = qualifier; 331 lastRow = res.getRow(); 332 gotValue = thisValue; 333 } 334 numRowsScanned.getAndIncrement(); 335 } 336 numScans.getAndIncrement(); 337 } 338 } 339 340 /** 341 * Creates a table with given table name and specified number of column families if the table does 342 * not already exist. 343 */ 344 public void setupTable(TableName table, int cfs) throws IOException { 345 try { 346 LOG.info("Creating table " + table); 347 HTableDescriptor htd = new HTableDescriptor(table); 348 htd.addCoprocessor(MyObserver.class.getName()); 349 MyObserver.sleepDuration = this.sleepDuration; 350 for (int i = 0; i < 10; i++) { 351 htd.addFamily(new HColumnDescriptor(family(i))); 352 } 353 354 UTIL.getAdmin().createTable(htd); 355 } catch (TableExistsException tee) { 356 LOG.info("Table " + table + " already exists"); 357 } 358 } 359 360 /** 361 * Atomic bulk load. 362 */ 363 @Test 364 public void testAtomicBulkLoad() throws Exception { 365 TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad"); 366 367 int millisToRun = 30000; 368 int numScanners = 50; 369 370 // Set createWALDir to true and use default values for other options. 371 UTIL.startMiniCluster(StartMiniClusterOption.builder().createWALDir(true).build()); 372 try { 373 WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null); 374 FindBulkHBaseListener listener = new FindBulkHBaseListener(); 375 log.registerWALActionsListener(listener); 376 runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners); 377 assertThat(listener.isFound(), is(true)); 378 } finally { 379 UTIL.shutdownMiniCluster(); 380 } 381 } 382 383 void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) 384 throws Exception { 385 setupTable(tableName, 10); 386 387 TestContext ctx = new TestContext(UTIL.getConfiguration()); 388 389 AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null); 390 ctx.addThread(loader); 391 392 List<AtomicScanReader> scanners = Lists.newArrayList(); 393 for (int i = 0; i < numScanners; i++) { 394 AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families); 395 scanners.add(scanner); 396 ctx.addThread(scanner); 397 } 398 399 ctx.startThreads(); 400 ctx.waitFor(millisToRun); 401 ctx.stop(); 402 403 LOG.info("Loaders:"); 404 LOG.info(" loaded " + loader.numBulkLoads.get()); 405 LOG.info(" compations " + loader.numCompactions.get()); 406 407 LOG.info("Scanners:"); 408 for (AtomicScanReader scanner : scanners) { 409 LOG.info(" scanned " + scanner.numScans.get()); 410 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); 411 } 412 } 413 414 /** 415 * Run test on an HBase instance for 5 minutes. This assumes that the table under test only has a 416 * single region. 417 */ 418 public static void main(String args[]) throws Exception { 419 try { 420 Configuration c = HBaseConfiguration.create(); 421 TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0); 422 test.setConf(c); 423 test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50); 424 } finally { 425 System.exit(0); // something hangs (believe it is lru threadpool) 426 } 427 } 428 429 private void setConf(Configuration c) { 430 UTIL = new HBaseTestingUtility(c); 431 } 432 433 static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener { 434 private boolean found = false; 435 436 @Override 437 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 438 for (Cell cell : logEdit.getCells()) { 439 KeyValue kv = KeyValueUtil.ensureKeyValue(cell); 440 for (Map.Entry entry : kv.toStringMap().entrySet()) { 441 if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) { 442 found = true; 443 } 444 } 445 } 446 } 447 448 public boolean isFound() { 449 return found; 450 } 451 } 452}