001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.core.Is.is; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.List; 030import java.util.Map; 031import java.util.Optional; 032import java.util.TreeMap; 033import java.util.concurrent.atomic.AtomicLong; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.ExtendedCell; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.HBaseTestingUtil; 042import org.apache.hadoop.hbase.HRegionLocation; 043import org.apache.hadoop.hbase.KeyValue; 044import org.apache.hadoop.hbase.KeyValueUtil; 045import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; 046import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; 047import org.apache.hadoop.hbase.StartTestingClusterOption; 048import org.apache.hadoop.hbase.TableExistsException; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 052import org.apache.hadoop.hbase.client.Connection; 053import org.apache.hadoop.hbase.client.RegionInfo; 054import org.apache.hadoop.hbase.client.RegionLocator; 055import org.apache.hadoop.hbase.client.Result; 056import org.apache.hadoop.hbase.client.ResultScanner; 057import org.apache.hadoop.hbase.client.Scan; 058import org.apache.hadoop.hbase.client.Table; 059import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 060import org.apache.hadoop.hbase.coprocessor.ObserverContext; 061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 062import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 063import org.apache.hadoop.hbase.coprocessor.RegionObserver; 064import org.apache.hadoop.hbase.io.compress.Compression; 065import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 066import org.apache.hadoop.hbase.io.hfile.CacheConfig; 067import org.apache.hadoop.hbase.io.hfile.HFile; 068import org.apache.hadoop.hbase.io.hfile.HFileContext; 069import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 070import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 071import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 072import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener; 073import org.apache.hadoop.hbase.testclassification.LargeTests; 074import org.apache.hadoop.hbase.testclassification.RegionServerTests; 075import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 076import org.apache.hadoop.hbase.util.Bytes; 077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 078import org.apache.hadoop.hbase.wal.WAL; 079import org.apache.hadoop.hbase.wal.WALEdit; 080import org.apache.hadoop.hbase.wal.WALKey; 081import org.junit.BeforeClass; 082import org.junit.ClassRule; 083import org.junit.Test; 084import org.junit.experimental.categories.Category; 085import org.junit.runner.RunWith; 086import org.junit.runners.Parameterized; 087import org.junit.runners.Parameterized.Parameters; 088import org.slf4j.Logger; 089import org.slf4j.LoggerFactory; 090 091import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 092 093/** 094 * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of the region server's 095 * bullkLoad functionality. 096 */ 097@RunWith(Parameterized.class) 098@Category({ RegionServerTests.class, LargeTests.class }) 099public class TestHRegionServerBulkLoad { 100 101 @ClassRule 102 public static final HBaseClassTestRule CLASS_RULE = 103 HBaseClassTestRule.forClass(TestHRegionServerBulkLoad.class); 104 105 private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class); 106 protected static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 107 protected final static Configuration conf = UTIL.getConfiguration(); 108 protected final static byte[] QUAL = Bytes.toBytes("qual"); 109 protected final static int NUM_CFS = 10; 110 private int sleepDuration; 111 public static int BLOCKSIZE = 64 * 1024; 112 public static Algorithm COMPRESSION = Compression.Algorithm.NONE; 113 114 protected final static byte[][] families = new byte[NUM_CFS][]; 115 static { 116 for (int i = 0; i < NUM_CFS; i++) { 117 families[i] = Bytes.toBytes(family(i)); 118 } 119 } 120 121 @Parameters 122 public static final Collection<Object[]> parameters() { 123 int[] sleepDurations = new int[] { 0, 30000 }; 124 List<Object[]> configurations = new ArrayList<>(); 125 for (int i : sleepDurations) { 126 configurations.add(new Object[] { i }); 127 } 128 return configurations; 129 } 130 131 public TestHRegionServerBulkLoad(int duration) { 132 this.sleepDuration = duration; 133 } 134 135 @BeforeClass 136 public static void setUpBeforeClass() throws Exception { 137 conf.setInt("hbase.rpc.timeout", 10 * 1000); 138 } 139 140 /** 141 * Create a rowkey compatible with 142 * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}. 143 */ 144 public static byte[] rowkey(int i) { 145 return Bytes.toBytes(String.format("row_%08d", i)); 146 } 147 148 static String family(int i) { 149 return String.format("family_%04d", i); 150 } 151 152 /** 153 * Create an HFile with the given number of rows with a specified value. 154 */ 155 public static void createHFile(FileSystem fs, Path path, byte[] family, byte[] qualifier, 156 byte[] value, int numRows) throws IOException { 157 HFileContext context = 158 new HFileContextBuilder().withBlockSize(BLOCKSIZE).withCompression(COMPRESSION).build(); 159 HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path) 160 .withFileContext(context).create(); 161 long now = EnvironmentEdgeManager.currentTime(); 162 try { 163 // subtract 2 since iterateOnSplits doesn't include boundary keys 164 for (int i = 0; i < numRows; i++) { 165 KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value); 166 writer.append(kv); 167 } 168 writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(now)); 169 } finally { 170 writer.close(); 171 } 172 } 173 174 /** 175 * Thread that does full scans of the table looking for any partially completed rows. Each 176 * iteration of this loads 10 hdfs files, which occupies 5 file open file handles. So every 10 177 * iterations (500 file handles) it does a region compaction to reduce the number of open file 178 * handles. 179 */ 180 public static class AtomicHFileLoader extends RepeatingTestThread { 181 final AtomicLong numBulkLoads = new AtomicLong(); 182 final AtomicLong numCompactions = new AtomicLong(); 183 private TableName tableName; 184 185 public AtomicHFileLoader(TableName tableName, TestContext ctx, byte targetFamilies[][]) 186 throws IOException { 187 super(ctx); 188 this.tableName = tableName; 189 } 190 191 @Override 192 public void doAnAction() throws Exception { 193 long iteration = numBulkLoads.getAndIncrement(); 194 Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", iteration)); 195 196 // create HFiles for different column families 197 FileSystem fs = UTIL.getTestFileSystem(); 198 byte[] val = Bytes.toBytes(String.format("%010d", iteration)); 199 Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR); 200 for (int i = 0; i < NUM_CFS; i++) { 201 Path hfile = new Path(dir, family(i)); 202 byte[] fam = Bytes.toBytes(family(i)); 203 createHFile(fs, hfile, fam, QUAL, val, 1000); 204 family2Files.put(fam, Collections.singletonList(hfile)); 205 } 206 // bulk load HFiles 207 BulkLoadHFiles.create(UTIL.getConfiguration()).bulkLoad(tableName, family2Files); 208 final Connection conn = UTIL.getConnection(); 209 // Periodically do compaction to reduce the number of open file handles. 210 if (numBulkLoads.get() % 5 == 0) { 211 // 5 * 50 = 250 open file handles! 212 try (RegionLocator locator = conn.getRegionLocator(tableName)) { 213 HRegionLocation loc = locator.getRegionLocation(Bytes.toBytes("aaa"), true); 214 conn.getAdmin().compactRegion(loc.getRegion().getRegionName()); 215 numCompactions.incrementAndGet(); 216 } 217 } 218 } 219 } 220 221 public static class MyObserver implements RegionCoprocessor, RegionObserver { 222 static int sleepDuration; 223 224 @Override 225 public Optional<RegionObserver> getRegionObserver() { 226 return Optional.of(this); 227 } 228 229 @Override 230 public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> e, 231 Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 232 CompactionRequest request) throws IOException { 233 try { 234 Thread.sleep(sleepDuration); 235 } catch (InterruptedException ie) { 236 IOException ioe = new InterruptedIOException(); 237 ioe.initCause(ie); 238 throw ioe; 239 } 240 return scanner; 241 } 242 } 243 244 /** 245 * Thread that does full scans of the table looking for any partially completed rows. 246 */ 247 public static class AtomicScanReader extends RepeatingTestThread { 248 byte targetFamilies[][]; 249 Table table; 250 AtomicLong numScans = new AtomicLong(); 251 AtomicLong numRowsScanned = new AtomicLong(); 252 TableName TABLE_NAME; 253 254 public AtomicScanReader(TableName TABLE_NAME, TestContext ctx, byte targetFamilies[][]) 255 throws IOException { 256 super(ctx); 257 this.TABLE_NAME = TABLE_NAME; 258 this.targetFamilies = targetFamilies; 259 table = UTIL.getConnection().getTable(TABLE_NAME); 260 } 261 262 @Override 263 public void doAnAction() throws Exception { 264 Scan s = new Scan(); 265 for (byte[] family : targetFamilies) { 266 s.addFamily(family); 267 } 268 ResultScanner scanner = table.getScanner(s); 269 270 for (Result res : scanner) { 271 byte[] lastRow = null, lastFam = null, lastQual = null; 272 byte[] gotValue = null; 273 for (byte[] family : targetFamilies) { 274 byte qualifier[] = QUAL; 275 byte thisValue[] = res.getValue(family, qualifier); 276 if (gotValue != null && thisValue != null && !Bytes.equals(gotValue, thisValue)) { 277 278 StringBuilder msg = new StringBuilder(); 279 msg.append("Failed on scan ").append(numScans).append(" after scanning ") 280 .append(numRowsScanned).append(" rows!\n"); 281 msg.append("Current was " + Bytes.toString(res.getRow()) + "/" + Bytes.toString(family) 282 + ":" + Bytes.toString(qualifier) + " = " + Bytes.toString(thisValue) + "\n"); 283 msg.append("Previous was " + Bytes.toString(lastRow) + "/" + Bytes.toString(lastFam) 284 + ":" + Bytes.toString(lastQual) + " = " + Bytes.toString(gotValue)); 285 throw new RuntimeException(msg.toString()); 286 } 287 288 lastFam = family; 289 lastQual = qualifier; 290 lastRow = res.getRow(); 291 gotValue = thisValue; 292 } 293 numRowsScanned.getAndIncrement(); 294 } 295 numScans.getAndIncrement(); 296 } 297 } 298 299 /** 300 * Creates a table with given table name and specified number of column families if the table does 301 * not already exist. 302 */ 303 public void setupTable(TableName table, int cfs) throws IOException { 304 try { 305 LOG.info("Creating table " + table); 306 TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(table); 307 308 tableDescriptorBuilder.setCoprocessor(MyObserver.class.getName()); 309 MyObserver.sleepDuration = this.sleepDuration; 310 for (int i = 0; i < 10; i++) { 311 ColumnFamilyDescriptor columnFamilyDescriptor = 312 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family(i))).build(); 313 tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor); 314 } 315 316 UTIL.getAdmin().createTable(tableDescriptorBuilder.build()); 317 } catch (TableExistsException tee) { 318 LOG.info("Table " + table + " already exists"); 319 } 320 } 321 322 /** 323 * Atomic bulk load. 324 */ 325 @Test 326 public void testAtomicBulkLoad() throws Exception { 327 TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad"); 328 329 int millisToRun = 30000; 330 int numScanners = 50; 331 332 // Set createWALDir to true and use default values for other options. 333 UTIL.startMiniCluster(StartTestingClusterOption.builder().createWALDir(true).build()); 334 try { 335 WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null); 336 FindBulkHBaseListener listener = new FindBulkHBaseListener(); 337 log.registerWALActionsListener(listener); 338 runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners); 339 assertThat(listener.isFound(), is(true)); 340 } finally { 341 UTIL.shutdownMiniCluster(); 342 } 343 } 344 345 void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) 346 throws Exception { 347 setupTable(tableName, 10); 348 349 TestContext ctx = new TestContext(UTIL.getConfiguration()); 350 351 AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null); 352 ctx.addThread(loader); 353 354 List<AtomicScanReader> scanners = Lists.newArrayList(); 355 for (int i = 0; i < numScanners; i++) { 356 AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families); 357 scanners.add(scanner); 358 ctx.addThread(scanner); 359 } 360 361 ctx.startThreads(); 362 ctx.waitFor(millisToRun); 363 ctx.stop(); 364 365 LOG.info("Loaders:"); 366 LOG.info(" loaded " + loader.numBulkLoads.get()); 367 LOG.info(" compations " + loader.numCompactions.get()); 368 369 LOG.info("Scanners:"); 370 for (AtomicScanReader scanner : scanners) { 371 LOG.info(" scanned " + scanner.numScans.get()); 372 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); 373 } 374 } 375 376 /** 377 * Run test on an HBase instance for 5 minutes. This assumes that the table under test only has a 378 * single region. 379 */ 380 public static void main(String args[]) throws Exception { 381 try { 382 Configuration c = HBaseConfiguration.create(); 383 TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0); 384 test.setConf(c); 385 test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50); 386 } finally { 387 System.exit(0); // something hangs (believe it is lru threadpool) 388 } 389 } 390 391 private void setConf(Configuration c) { 392 UTIL = new HBaseTestingUtil(c); 393 } 394 395 static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener { 396 private boolean found = false; 397 398 @Override 399 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 400 for (Cell cell : logEdit.getCells()) { 401 KeyValue kv = KeyValueUtil.ensureKeyValue((ExtendedCell) cell); 402 for (Map.Entry<String, Object> entry : kv.toStringMap().entrySet()) { 403 if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) { 404 found = true; 405 } 406 } 407 } 408 } 409 410 public boolean isFound() { 411 return found; 412 } 413 } 414}