001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.test; 019 020import static org.apache.hadoop.hbase.IntegrationTestingUtility.DEFAULT_REGIONS_PER_SERVER; 021import static org.apache.hadoop.hbase.IntegrationTestingUtility.PRESPLIT_TEST_TABLE; 022import static org.apache.hadoop.hbase.IntegrationTestingUtility.PRESPLIT_TEST_TABLE_KEY; 023import static org.apache.hadoop.hbase.IntegrationTestingUtility.REGIONS_PER_SERVER_KEY; 024import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 025 026import java.io.BufferedReader; 027import java.io.ByteArrayOutputStream; 028import java.io.DataInput; 029import java.io.DataOutput; 030import java.io.IOException; 031import java.io.InputStreamReader; 032import java.net.URI; 033import java.net.URISyntaxException; 034import java.nio.charset.StandardCharsets; 035import java.util.ArrayList; 036import java.util.Collection; 037import java.util.HashSet; 038import java.util.List; 039import java.util.Set; 040import java.util.concurrent.CompletableFuture; 041import java.util.concurrent.ExecutionException; 042import java.util.concurrent.ExecutorService; 043import java.util.concurrent.Executors; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.atomic.AtomicLong; 046import java.util.regex.Matcher; 047import java.util.regex.Pattern; 048import java.util.zip.GZIPInputStream; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.conf.Configured; 051import org.apache.hadoop.fs.FSDataInputStream; 052import org.apache.hadoop.fs.FileSystem; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.hbase.Cell; 055import org.apache.hadoop.hbase.HBaseConfiguration; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.IntegrationTestBase; 058import org.apache.hadoop.hbase.IntegrationTestingUtility; 059import org.apache.hadoop.hbase.MasterNotRunningException; 060import org.apache.hadoop.hbase.TableName; 061import org.apache.hadoop.hbase.client.Admin; 062import org.apache.hadoop.hbase.client.AsyncConnection; 063import org.apache.hadoop.hbase.client.AsyncTable; 064import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 065import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 066import org.apache.hadoop.hbase.client.Connection; 067import org.apache.hadoop.hbase.client.ConnectionFactory; 068import org.apache.hadoop.hbase.client.Get; 069import org.apache.hadoop.hbase.client.Increment; 070import org.apache.hadoop.hbase.client.Put; 071import org.apache.hadoop.hbase.client.Result; 072import org.apache.hadoop.hbase.client.ScanResultConsumer; 073import org.apache.hadoop.hbase.client.Table; 074import org.apache.hadoop.hbase.client.TableDescriptor; 075import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 076import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 077import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 078import org.apache.hadoop.hbase.regionserver.BloomType; 079import org.apache.hadoop.hbase.test.util.CRC64; 080import org.apache.hadoop.hbase.test.util.warc.WARCInputFormat; 081import org.apache.hadoop.hbase.test.util.warc.WARCRecord; 082import org.apache.hadoop.hbase.test.util.warc.WARCWritable; 083import org.apache.hadoop.hbase.util.Bytes; 084import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 085import org.apache.hadoop.hbase.util.RegionSplitter; 086import org.apache.hadoop.io.BytesWritable; 087import org.apache.hadoop.io.LongWritable; 088import org.apache.hadoop.io.NullWritable; 089import org.apache.hadoop.io.SequenceFile.CompressionType; 090import org.apache.hadoop.io.Writable; 091import org.apache.hadoop.mapreduce.Counters; 092import org.apache.hadoop.mapreduce.Job; 093import org.apache.hadoop.mapreduce.JobContext; 094import org.apache.hadoop.mapreduce.Mapper; 095import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 096import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 097import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 098import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 099import org.apache.hadoop.util.Tool; 100import org.apache.hadoop.util.ToolRunner; 101import org.slf4j.Logger; 102import org.slf4j.LoggerFactory; 103 104import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 105import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 106 107/** 108 * This integration test loads successful resource retrieval records from the Common Crawl 109 * (https://commoncrawl.org/) public dataset into an HBase table and writes records that can be used 110 * to later verify the presence and integrity of those records. 111 * <p> 112 * Run like: <blockquote> ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestLoadCommonCrawl 113 * \<br> 114 * -Dfs.s3n.awsAccessKeyId=<AWS access key> \<br> 115 * -Dfs.s3n.awsSecretAccessKey=<AWS secret key> \<br> 116 * /path/to/test-CC-MAIN-2021-10-warc.paths.gz \<br> 117 * /path/to/tmp/warc-loader-output </blockquote> 118 * <p> 119 * Access to the Common Crawl dataset in S3 is made available to anyone by Amazon AWS, but Hadoop's 120 * S3N filesystem still requires valid access credentials to initialize. 121 * <p> 122 * The input path can either specify a directory or a file. The file may optionally be compressed 123 * with gzip. If a directory, the loader expects the directory to contain one or more WARC files 124 * from the Common Crawl dataset. If a file, the loader expects a list of Hadoop S3N URIs which 125 * point to S3 locations for one or more WARC files from the Common Crawl dataset, one URI per line. 126 * Lines should be terminated with the UNIX line terminator. 127 * <p> 128 * Included in hbase-it/src/test/resources/CC-MAIN-2021-10-warc.paths.gz is a list of all WARC files 129 * comprising the Q1 2021 crawl archive. There are 64,000 WARC files in this data set, each 130 * containing ~1GB of gzipped data. The WARC files contain several record types, such as metadata, 131 * request, and response, but we only load the response record types. If the HBase table schema does 132 * not specify compression (by default) there is roughly a 10x expansion. Loading the full crawl 133 * archive results in a table approximately 640 TB in size. 134 * <p> 135 * The loader can optionally drive read load during ingest by incrementing counters for each URL 136 * discovered in content. Add <tt>-DIntegrationTestLoadCommonCrawl.increments=true</tt> to the 137 * command line to enable. 138 * <p> 139 * You can also split the Loader and Verify stages: 140 * <p> 141 * Load with: <blockquote> ./bin/hbase 142 * 'org.apache.hadoop.hbase.test.IntegrationTestLoadCommonCrawl$Loader' \<br> 143 * -files /path/to/hadoop-aws.jar \<br> 144 * -Dfs.s3n.awsAccessKeyId=<AWS access key> \<br> 145 * -Dfs.s3n.awsSecretAccessKey=<AWS secret key> \<br> 146 * /path/to/test-CC-MAIN-2021-10-warc.paths.gz \<br> 147 * /path/to/tmp/warc-loader-output </blockquote> 148 * <p> 149 * Note: The hadoop-aws jar will be needed at runtime to instantiate the S3N filesystem. Use the 150 * <tt>-files</tt> ToolRunner argument to add it. 151 * <p> 152 * Verify with: <blockquote> ./bin/hbase 153 * 'org.apache.hadoop.hbase.test.IntegrationTestLoadCommonCrawl$Verify' \<br> 154 * /path/to/tmp/warc-loader-output </blockquote> 155 * <p> 156 */ 157public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { 158 159 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestLoadCommonCrawl.class); 160 161 static final String TABLE_NAME_KEY = "IntegrationTestLoadCommonCrawl.table"; 162 static final String DEFAULT_TABLE_NAME = "IntegrationTestLoadCommonCrawl"; 163 164 static final String INCREMENTS_NAME_KEY = "IntegrationTestLoadCommonCrawl.increments"; 165 static final boolean DEFAULT_INCREMENTS = false; 166 167 static final int MAX_INFLIGHT = 1000; 168 static final int INFLIGHT_PAUSE_MS = 100; 169 170 static final byte[] CONTENT_FAMILY_NAME = Bytes.toBytes("c"); 171 static final byte[] INFO_FAMILY_NAME = Bytes.toBytes("i"); 172 static final byte[] URL_FAMILY_NAME = Bytes.toBytes("u"); 173 static final byte[] SEP = Bytes.toBytes(":"); 174 static final byte[] CONTENT_QUALIFIER = HConstants.EMPTY_BYTE_ARRAY; 175 static final byte[] CONTENT_LENGTH_QUALIFIER = Bytes.toBytes("l"); 176 static final byte[] CONTENT_TYPE_QUALIFIER = Bytes.toBytes("t"); 177 static final byte[] CRC_QUALIFIER = Bytes.toBytes("c"); 178 static final byte[] DATE_QUALIFIER = Bytes.toBytes("d"); 179 static final byte[] IP_ADDRESS_QUALIFIER = Bytes.toBytes("a"); 180 static final byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u"); 181 static final byte[] REF_QUALIFIER = Bytes.toBytes("ref"); 182 183 public static enum Counts { 184 REFERENCED, 185 UNREFERENCED, 186 CORRUPT, 187 RPC_BYTES_WRITTEN, 188 RPC_TIME_MS, 189 } 190 191 protected Path warcFileInputDir = null; 192 protected Path outputDir = null; 193 protected String[] args; 194 195 protected int runLoader(final Path warcFileInputDir, final Path outputDir) throws Exception { 196 Loader loader = new Loader(); 197 loader.setConf(conf); 198 return loader.run(warcFileInputDir, outputDir); 199 } 200 201 protected int runVerify(final Path inputDir) throws Exception { 202 Verify verify = new Verify(); 203 verify.setConf(conf); 204 return verify.run(inputDir); 205 } 206 207 @Override 208 public int run(String[] args) { 209 if (args.length > 0) { 210 warcFileInputDir = new Path(args[0]); 211 if (args.length > 1) { 212 outputDir = new Path(args[1]); 213 } 214 } 215 try { 216 if (warcFileInputDir == null) { 217 throw new IllegalArgumentException("WARC input file or directory not specified"); 218 } 219 if (outputDir == null) { 220 throw new IllegalArgumentException("Output directory not specified"); 221 } 222 int res = runLoader(warcFileInputDir, outputDir); 223 if (res != 0) { 224 LOG.error("Loader failed"); 225 return -1; 226 } 227 return runVerify(outputDir); 228 } catch (Exception e) { 229 LOG.error("Tool failed with exception", e); 230 return -1; 231 } 232 } 233 234 @Override 235 protected void processOptions(final CommandLine cmd) { 236 processBaseOptions(cmd); 237 args = cmd.getArgs(); 238 } 239 240 @Override 241 public void setUpCluster() throws Exception { 242 util = getTestingUtil(getConf()); 243 boolean isDistributed = util.isDistributedCluster(); 244 util.initializeCluster(isDistributed ? 1 : 3); 245 if (!isDistributed) { 246 util.startMiniMapReduceCluster(); 247 } 248 this.setConf(util.getConfiguration()); 249 } 250 251 @Override 252 public void cleanUpCluster() throws Exception { 253 super.cleanUpCluster(); 254 if (util.isDistributedCluster()) { 255 util.shutdownMiniMapReduceCluster(); 256 } 257 } 258 259 static TableName getTablename(final Configuration c) { 260 return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME)); 261 } 262 263 @Override 264 public TableName getTablename() { 265 return getTablename(getConf()); 266 } 267 268 @Override 269 protected Set<String> getColumnFamilies() { 270 Set<String> families = new HashSet<>(); 271 families.add(Bytes.toString(CONTENT_FAMILY_NAME)); 272 families.add(Bytes.toString(INFO_FAMILY_NAME)); 273 families.add(Bytes.toString(URL_FAMILY_NAME)); 274 return families; 275 } 276 277 @Override 278 public int runTestFromCommandLine() throws Exception { 279 return ToolRunner.run(getConf(), this, args); 280 } 281 282 public static void main(String[] args) throws Exception { 283 Configuration conf = HBaseConfiguration.create(); 284 IntegrationTestingUtility.setUseDistributedCluster(conf); 285 int ret = ToolRunner.run(conf, new IntegrationTestLoadCommonCrawl(), args); 286 System.exit(ret); 287 } 288 289 public static class HBaseKeyWritable implements Writable { 290 291 private byte[] row; 292 private int rowOffset; 293 private int rowLength; 294 private byte[] family; 295 private int familyOffset; 296 private int familyLength; 297 private byte[] qualifier; 298 private int qualifierOffset; 299 private int qualifierLength; 300 private long ts; 301 302 public HBaseKeyWritable() { 303 } 304 305 public HBaseKeyWritable(byte[] row, int rowOffset, int rowLength, byte[] family, 306 int familyOffset, int familyLength, byte[] qualifier, int qualifierOffset, 307 int qualifierLength, long ts) { 308 this.row = row; 309 this.rowOffset = rowOffset; 310 this.rowLength = rowLength; 311 this.family = family; 312 this.familyOffset = familyOffset; 313 this.familyLength = familyLength; 314 this.qualifier = qualifier; 315 this.qualifierOffset = qualifierOffset; 316 this.qualifierLength = qualifierLength; 317 this.ts = ts; 318 } 319 320 public HBaseKeyWritable(byte[] row, byte[] family, byte[] qualifier, long ts) { 321 this(row, 0, row.length, family, 0, family.length, qualifier, 0, 322 qualifier != null ? qualifier.length : 0, ts); 323 } 324 325 public HBaseKeyWritable(byte[] row, byte[] family, byte[] qualifier) { 326 this(row, family, qualifier, Long.MAX_VALUE); 327 } 328 329 public HBaseKeyWritable(byte[] row, byte[] family, long ts) { 330 this(row, family, HConstants.EMPTY_BYTE_ARRAY, ts); 331 } 332 333 public HBaseKeyWritable(byte[] row, byte[] family) { 334 this(row, family, Long.MAX_VALUE); 335 } 336 337 public HBaseKeyWritable(Cell cell) { 338 this(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getFamilyArray(), 339 cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), 340 cell.getQualifierOffset(), cell.getQualifierLength(), cell.getTimestamp()); 341 } 342 343 @Override 344 public void readFields(DataInput in) throws IOException { 345 this.row = Bytes.toBytes(in.readUTF()); 346 this.rowOffset = 0; 347 this.rowLength = row.length; 348 this.family = Bytes.toBytes(in.readUTF()); 349 this.familyOffset = 0; 350 this.familyLength = family.length; 351 this.qualifier = Bytes.toBytes(in.readUTF()); 352 this.qualifierOffset = 0; 353 this.qualifierLength = qualifier.length; 354 this.ts = in.readLong(); 355 } 356 357 @Override 358 public void write(DataOutput out) throws IOException { 359 out.writeUTF(new String(row, rowOffset, rowLength, StandardCharsets.UTF_8)); 360 out.writeUTF(new String(family, familyOffset, familyLength, StandardCharsets.UTF_8)); 361 if (qualifier != null) { 362 out.writeUTF( 363 new String(qualifier, qualifierOffset, qualifierLength, StandardCharsets.UTF_8)); 364 } else { 365 out.writeUTF(""); 366 } 367 out.writeLong(ts); 368 } 369 370 public byte[] getRowArray() { 371 return row; 372 } 373 374 public void setRow(byte[] row) { 375 this.row = row; 376 } 377 378 public int getRowOffset() { 379 return rowOffset; 380 } 381 382 public void setRowOffset(int rowOffset) { 383 this.rowOffset = rowOffset; 384 } 385 386 public int getRowLength() { 387 return rowLength; 388 } 389 390 public void setRowLength(int rowLength) { 391 this.rowLength = rowLength; 392 } 393 394 public byte[] getFamilyArray() { 395 return family; 396 } 397 398 public void setFamily(byte[] family) { 399 this.family = family; 400 } 401 402 public int getFamilyOffset() { 403 return familyOffset; 404 } 405 406 public void setFamilyOffset(int familyOffset) { 407 this.familyOffset = familyOffset; 408 } 409 410 public int getFamilyLength() { 411 return familyLength; 412 } 413 414 public void setFamilyLength(int familyLength) { 415 this.familyLength = familyLength; 416 } 417 418 public byte[] getQualifierArray() { 419 return qualifier; 420 } 421 422 public void setQualifier(byte[] qualifier) { 423 this.qualifier = qualifier; 424 } 425 426 public int getQualifierOffset() { 427 return qualifierOffset; 428 } 429 430 public void setQualifierOffset(int qualifierOffset) { 431 this.qualifierOffset = qualifierOffset; 432 } 433 434 public int getQualifierLength() { 435 return qualifierLength; 436 } 437 438 public void setQualifierLength(int qualifierLength) { 439 this.qualifierLength = qualifierLength; 440 } 441 442 public long getTimestamp() { 443 return ts; 444 } 445 446 public void setTimestamp(long ts) { 447 this.ts = ts; 448 } 449 } 450 451 public static class Loader extends Configured implements Tool { 452 453 private static final Logger LOG = LoggerFactory.getLogger(Loader.class); 454 private static final String USAGE = "Loader <warInputDir | warFileList> <outputDir>"; 455 456 void createSchema(final TableName tableName) throws IOException { 457 458 try (Connection conn = ConnectionFactory.createConnection(getConf()); 459 Admin admin = conn.getAdmin()) { 460 if (!admin.tableExists(tableName)) { 461 462 ColumnFamilyDescriptorBuilder contentFamilyBuilder = 463 ColumnFamilyDescriptorBuilder.newBuilder(CONTENT_FAMILY_NAME).setMaxVersions(1000) 464 .setDataBlockEncoding(DataBlockEncoding.NONE).setBloomFilterType(BloomType.ROW); 465 466 ColumnFamilyDescriptorBuilder infoFamilyBuilder = 467 ColumnFamilyDescriptorBuilder.newBuilder(INFO_FAMILY_NAME).setMaxVersions(1000) 468 .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1) 469 .setBloomFilterType(BloomType.ROWCOL).setBlocksize(8 * 1024); 470 471 ColumnFamilyDescriptorBuilder urlFamilyBuilder = 472 ColumnFamilyDescriptorBuilder.newBuilder(URL_FAMILY_NAME).setMaxVersions(1000) 473 .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1) 474 .setBloomFilterType(BloomType.ROWCOL).setBlocksize(8 * 1024); 475 476 Set<ColumnFamilyDescriptor> families = new HashSet<>(); 477 families.add(contentFamilyBuilder.build()); 478 families.add(infoFamilyBuilder.build()); 479 families.add(urlFamilyBuilder.build()); 480 481 TableDescriptor tableDescriptor = 482 TableDescriptorBuilder.newBuilder(tableName).setColumnFamilies(families).build(); 483 484 if (getConf().getBoolean(PRESPLIT_TEST_TABLE_KEY, PRESPLIT_TEST_TABLE)) { 485 int numberOfServers = admin.getRegionServers().size(); 486 if (numberOfServers == 0) { 487 throw new IllegalStateException("No live regionservers"); 488 } 489 int regionsPerServer = 490 getConf().getInt(REGIONS_PER_SERVER_KEY, DEFAULT_REGIONS_PER_SERVER); 491 int totalNumberOfRegions = numberOfServers * regionsPerServer; 492 LOG.info("Creating test table: " + tableDescriptor); 493 LOG.info("Number of live regionservers: " + numberOfServers + ", " 494 + "pre-splitting table into " + totalNumberOfRegions + " regions " 495 + "(default regions per server: " + regionsPerServer + ")"); 496 byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions); 497 admin.createTable(tableDescriptor, splits); 498 } else { 499 LOG.info("Creating test table: " + tableDescriptor); 500 admin.createTable(tableDescriptor); 501 } 502 } 503 } catch (MasterNotRunningException e) { 504 LOG.error("Master not running", e); 505 throw new IOException(e); 506 } 507 } 508 509 int run(final Path warcFileInput, final Path outputDir) 510 throws IOException, ClassNotFoundException, InterruptedException { 511 512 createSchema(getTablename(getConf())); 513 514 final Job job = Job.getInstance(getConf()); 515 job.setJobName(Loader.class.getName()); 516 job.setNumReduceTasks(0); 517 job.setJarByClass(getClass()); 518 job.setMapperClass(LoaderMapper.class); 519 job.setInputFormatClass(WARCInputFormat.class); 520 final FileSystem fs = FileSystem.get(warcFileInput.toUri(), getConf()); 521 if (fs.getFileStatus(warcFileInput).isDirectory()) { 522 LOG.info("Using directory as WARC input path: " + warcFileInput); 523 FileInputFormat.setInputPaths(job, warcFileInput); 524 } else if (warcFileInput.toUri().getScheme().equals("file")) { 525 LOG.info("Getting WARC input paths from file: " + warcFileInput); 526 final List<Path> paths = new ArrayList<Path>(); 527 try (FSDataInputStream is = fs.open(warcFileInput)) { 528 InputStreamReader reader; 529 if (warcFileInput.getName().toLowerCase().endsWith(".gz")) { 530 reader = new InputStreamReader(new GZIPInputStream(is), StandardCharsets.UTF_8); 531 } else { 532 reader = new InputStreamReader(is, StandardCharsets.UTF_8); 533 } 534 try (BufferedReader br = new BufferedReader(reader)) { 535 String line; 536 while ((line = br.readLine()) != null) { 537 paths.add(new Path(line)); 538 } 539 } 540 } 541 LOG.info("Read " + paths.size() + " WARC input paths from " + warcFileInput); 542 FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()])); 543 } else { 544 FileInputFormat.setInputPaths(job, warcFileInput); 545 } 546 job.setOutputFormatClass(SequenceFileOutputFormat.class); 547 SequenceFileOutputFormat.setOutputPath(job, outputDir); 548 SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); 549 job.setOutputKeyClass(HBaseKeyWritable.class); 550 job.setOutputValueClass(BytesWritable.class); 551 TableMapReduceUtil.addDependencyJars(job); 552 // Increase max attempts because S3 might throttle aggressively and ultimately fail a task 553 job.getConfiguration().setInt("mapred.map.max.attempts", 100); 554 job.getConfiguration().setInt("mapreduce.map.maxattempts", 100); 555 556 boolean success = job.waitForCompletion(true); 557 if (!success) { 558 LOG.error("Failure during job " + job.getJobID()); 559 } 560 561 final Counters counters = job.getCounters(); 562 for (Counts c : Counts.values()) { 563 long value = counters.findCounter(c).getValue(); 564 if (value != 0) { 565 LOG.info(c + ": " + value); 566 } 567 } 568 569 return success ? 0 : 1; 570 } 571 572 @Override 573 public int run(String[] args) throws Exception { 574 if (args.length < 2) { 575 System.err.println(USAGE); 576 return 1; 577 } 578 try { 579 Path warcFileInput = new Path(args[0]); 580 Path outputDir = new Path(args[1]); 581 return run(warcFileInput, outputDir); 582 } catch (NumberFormatException e) { 583 System.err.println("Parsing loader arguments failed: " + e.getMessage()); 584 System.err.println(USAGE); 585 return 1; 586 } 587 } 588 589 public static void main(String[] args) throws Exception { 590 System.exit(ToolRunner.run(HBaseConfiguration.create(), new Loader(), args)); 591 } 592 593 public static class LoaderMapper 594 extends Mapper<LongWritable, WARCWritable, HBaseKeyWritable, BytesWritable> { 595 596 protected AsyncConnection conn; 597 protected AsyncTable<ScanResultConsumer> table; 598 protected ExecutorService executor; 599 protected AtomicLong inflight = new AtomicLong(); 600 protected boolean doIncrements; 601 602 @Override 603 protected void setup(final Context context) throws IOException, InterruptedException { 604 executor = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors()); 605 Configuration conf = context.getConfiguration(); 606 doIncrements = conf.getBoolean(INCREMENTS_NAME_KEY, DEFAULT_INCREMENTS); 607 try { 608 conn = ConnectionFactory.createAsyncConnection(conf).get(); 609 table = conn.getTable(getTablename(conf), executor); 610 } catch (ExecutionException e) { 611 throw new IOException(e); 612 } 613 } 614 615 @Override 616 protected void cleanup(final Context context) throws IOException, InterruptedException { 617 618 while (inflight.get() != 0) { 619 LOG.info("Operations in flight, waiting"); 620 Thread.sleep(INFLIGHT_PAUSE_MS); 621 } 622 623 // Shut down the executor 624 executor.shutdown(); 625 if (!executor.awaitTermination(1, TimeUnit.MINUTES)) { 626 LOG.warn("Pool did not shut down cleanly"); 627 } 628 // Close the connection 629 try { 630 conn.close(); 631 } catch (Exception e) { 632 LOG.warn("Exception closing Connection", e); 633 } 634 } 635 636 @Override 637 protected void map(final LongWritable key, final WARCWritable value, final Context output) 638 throws IOException, InterruptedException { 639 final WARCRecord.Header warcHeader = value.getRecord().getHeader(); 640 final String recordID = warcHeader.getRecordID(); 641 final String targetURI = warcHeader.getTargetURI(); 642 if (warcHeader.getRecordType().equals("response") && targetURI != null) { 643 final String contentType = warcHeader.getField("WARC-Identified-Payload-Type"); 644 if (contentType != null) { 645 // Make row key 646 byte[] rowKey; 647 try { 648 rowKey = rowKeyFromTargetURI(targetURI); 649 } catch (IllegalArgumentException e) { 650 LOG.debug("Could not make a row key for record " + recordID + ", ignoring", e); 651 return; 652 } catch (URISyntaxException e) { 653 LOG.warn( 654 "Could not parse URI \"" + targetURI + "\" for record " + recordID + ", ignoring"); 655 return; 656 } 657 658 // Get the content and calculate the CRC64 659 final byte[] content = value.getRecord().getContent(); 660 final CRC64 crc = new CRC64(); 661 crc.update(content); 662 final long crc64 = crc.getValue(); 663 LOG.info("{}: content {} bytes, crc64={}", targetURI, content.length, 664 Bytes.toHex(Bytes.toBytes(crc64))); 665 666 // Store to HBase 667 final long ts = getSequence(); 668 final Put put = new Put(rowKey); 669 put.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER, ts, content); 670 put.addColumn(INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, ts, 671 Bytes.toBytes(content.length)); 672 put.addColumn(INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, ts, Bytes.toBytes(contentType)); 673 put.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER, ts, Bytes.toBytes(crc64)); 674 put.addColumn(INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, ts, Bytes.toBytes(targetURI)); 675 put.addColumn(INFO_FAMILY_NAME, DATE_QUALIFIER, ts, 676 Bytes.toBytes(warcHeader.getDateString())); 677 final String ipAddr = warcHeader.getField("WARC-IP-Address"); 678 if (ipAddr != null) { 679 put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts, Bytes.toBytes(ipAddr)); 680 } 681 long pending = inflight.incrementAndGet(); 682 while (pending > MAX_INFLIGHT) { 683 LOG.info("Too many operations in flight, waiting"); 684 Thread.sleep(INFLIGHT_PAUSE_MS); 685 pending = inflight.get(); 686 } 687 final long putStartTime = System.currentTimeMillis(); 688 final CompletableFuture<Void> putFuture = table.put(put); 689 addListener(putFuture, (r, e) -> { 690 inflight.decrementAndGet(); 691 if (e == null) { 692 output.getCounter(Counts.RPC_TIME_MS) 693 .increment(System.currentTimeMillis() - putStartTime); 694 output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(put.heapSize()); 695 } 696 }); 697 698 // Write records out for later verification, one per HBase field except for the 699 // content record, which will be verified by CRC64. 700 output.write( 701 new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, ts), 702 new BytesWritable(Bytes.toBytes(content.length))); 703 output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, ts), 704 new BytesWritable(Bytes.toBytes(contentType))); 705 output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, ts), 706 new BytesWritable(Bytes.toBytes(crc64))); 707 output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, ts), 708 new BytesWritable(Bytes.toBytes(targetURI))); 709 output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, DATE_QUALIFIER, ts), 710 new BytesWritable(Bytes.toBytes(warcHeader.getDateString()))); 711 if (ipAddr != null) { 712 output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts), 713 new BytesWritable(Bytes.toBytes(ipAddr))); 714 } 715 716 if (doIncrements) { 717 // The URLs cf is not tracked for correctness. For now it is used only to exercise 718 // Increments, to drive some read load during ingest. They can be verified with a 719 // reducer to sum increments per row and then compare the final count to the table 720 // data. This is left as a future exercise. 721 final byte[] refQual = Bytes.add(REF_QUALIFIER, SEP, rowKey); 722 for (String refUri : extractUrls(content)) { 723 try { 724 byte[] urlRowKey = rowKeyFromTargetURI(refUri); 725 LOG.debug(" -> {}", refUri); 726 final Increment increment = new Increment(urlRowKey); 727 increment.setTimestamp(ts); 728 increment.addColumn(URL_FAMILY_NAME, refQual, 1); 729 pending = inflight.incrementAndGet(); 730 while (pending > MAX_INFLIGHT) { 731 LOG.info("Too many operations in flight, waiting"); 732 Thread.sleep(INFLIGHT_PAUSE_MS); 733 pending = inflight.get(); 734 } 735 final long incrStartTime = System.currentTimeMillis(); 736 final CompletableFuture<Result> incrFuture = table.increment(increment); 737 addListener(incrFuture, (r, e) -> { 738 inflight.decrementAndGet(); 739 if (e == null) { 740 output.getCounter(Counts.RPC_TIME_MS) 741 .increment(System.currentTimeMillis() - incrStartTime); 742 output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(increment.heapSize()); 743 } 744 }); 745 } catch (IllegalArgumentException | URISyntaxException e) { 746 LOG.debug("Could not make a row key for URI " + refUri + ", ignoring", e); 747 } 748 } 749 } 750 } 751 } 752 } 753 } 754 } 755 756 public static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> { 757 @Override 758 protected boolean isSplitable(final JobContext context, final Path filename) { 759 return false; 760 } 761 } 762 763 public static class Verify extends Configured implements Tool { 764 765 public static final Logger LOG = LoggerFactory.getLogger(Verify.class); 766 public static final String USAGE = "Verify <inputDir>"; 767 768 int run(final Path inputDir) throws IOException, ClassNotFoundException, InterruptedException { 769 Job job = Job.getInstance(getConf()); 770 job.setJobName(Verify.class.getName()); 771 job.setJarByClass(getClass()); 772 job.setMapperClass(VerifyMapper.class); 773 job.setInputFormatClass(OneFilePerMapperSFIF.class); 774 FileInputFormat.setInputPaths(job, inputDir); 775 job.setOutputFormatClass(NullOutputFormat.class); 776 job.setOutputKeyClass(NullWritable.class); 777 job.setOutputValueClass(NullWritable.class); 778 TableMapReduceUtil.addDependencyJars(job); 779 780 boolean success = job.waitForCompletion(true); 781 if (!success) { 782 LOG.error("Failure during job " + job.getJobID()); 783 } 784 785 final Counters counters = job.getCounters(); 786 for (Counts c : Counts.values()) { 787 long value = counters.findCounter(c).getValue(); 788 if (value != 0) { 789 LOG.info(c + ": " + value); 790 } 791 } 792 if (counters.findCounter(Counts.UNREFERENCED).getValue() > 0) { 793 LOG.error("Nonzero UNREFERENCED count from job " + job.getJobID()); 794 success = false; 795 } 796 if (counters.findCounter(Counts.CORRUPT).getValue() > 0) { 797 LOG.error("Nonzero CORRUPT count from job " + job.getJobID()); 798 success = false; 799 } 800 801 return success ? 0 : 1; 802 } 803 804 @Override 805 public int run(String[] args) throws Exception { 806 if (args.length < 2) { 807 System.err.println(USAGE); 808 return 1; 809 } 810 Path loaderOutput = new Path(args[0]); 811 return run(loaderOutput); 812 } 813 814 public static void main(String[] args) throws Exception { 815 System.exit(ToolRunner.run(HBaseConfiguration.create(), new Verify(), args)); 816 } 817 818 public static class VerifyMapper 819 extends Mapper<HBaseKeyWritable, BytesWritable, NullWritable, NullWritable> { 820 821 protected Connection conn; 822 protected Table table; 823 824 @Override 825 protected void setup(final Context context) throws IOException, InterruptedException { 826 Configuration conf = context.getConfiguration(); 827 conn = ConnectionFactory.createConnection(conf); 828 table = conn.getTable(getTablename(conf)); 829 } 830 831 @Override 832 protected void cleanup(final Context context) throws IOException, InterruptedException { 833 // Close the table 834 try { 835 table.close(); 836 } catch (Exception e) { 837 LOG.warn("Exception closing table", e); 838 } 839 // Close the connection 840 try { 841 conn.close(); 842 } catch (Exception e) { 843 LOG.warn("Exception closing Connection", e); 844 } 845 } 846 847 @Override 848 protected void map(final HBaseKeyWritable key, final BytesWritable value, 849 final Context output) throws IOException, InterruptedException { 850 final byte[] row = Bytes.copy(key.getRowArray(), key.getRowOffset(), key.getRowLength()); 851 final byte[] family = 852 Bytes.copy(key.getFamilyArray(), key.getFamilyOffset(), key.getFamilyLength()); 853 final byte[] qualifier = 854 Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(), key.getQualifierLength()); 855 final long ts = key.getTimestamp(); 856 857 if (Bytes.equals(INFO_FAMILY_NAME, family) && Bytes.equals(CRC_QUALIFIER, qualifier)) { 858 final long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength()); 859 final Get get = new Get(row).setTimestamp(ts).addFamily(CONTENT_FAMILY_NAME) 860 .addFamily(INFO_FAMILY_NAME); 861 final long startTime = System.currentTimeMillis(); 862 Result r; 863 try { 864 r = table.get(get); 865 output.getCounter(Counts.RPC_TIME_MS).increment(System.currentTimeMillis() - startTime); 866 } catch (Exception e) { 867 LOG.error("Row " + Bytes.toStringBinary(row) + ": exception", e); 868 output.getCounter(Counts.UNREFERENCED).increment(1); 869 return; 870 } 871 final byte[] crcBytes = r.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER); 872 if (crcBytes == null) { 873 LOG.error("Row " + Bytes.toStringBinary(row) + ": missing i:c"); 874 output.getCounter(Counts.UNREFERENCED).increment(1); 875 return; 876 } 877 if (Bytes.toLong(crcBytes) != expectedCRC64) { 878 LOG.error("Row " + Bytes.toStringBinary(row) + ": i:c mismatch"); 879 output.getCounter(Counts.CORRUPT).increment(1); 880 return; 881 } 882 // If we fell through to here all verification checks have succeeded for the info 883 // record. 884 output.getCounter(Counts.REFERENCED).increment(1); 885 final byte[] content = r.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER); 886 if (content == null) { 887 LOG.error("Row " + Bytes.toStringBinary(row) + ": missing content"); 888 output.getCounter(Counts.UNREFERENCED).increment(1); 889 return; 890 } else { 891 final CRC64 crc = new CRC64(); 892 crc.update(content); 893 if (crc.getValue() != expectedCRC64) { 894 LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content"); 895 output.getCounter(Counts.CORRUPT).increment(1); 896 return; 897 } 898 } 899 // If we fell through to here all verification checks have succeeded for the content 900 // record. 901 output.getCounter(Counts.REFERENCED).increment(1); 902 } else { 903 final long startTime = System.currentTimeMillis(); 904 final Get get = new Get(row).setTimestamp(ts).addColumn(family, qualifier); 905 Result r; 906 try { 907 r = table.get(get); 908 output.getCounter(Counts.RPC_TIME_MS).increment(System.currentTimeMillis() - startTime); 909 } catch (Exception e) { 910 LOG.error("Row " + Bytes.toStringBinary(row) + ": exception", e); 911 output.getCounter(Counts.UNREFERENCED).increment(1); 912 return; 913 } 914 final byte[] bytes = r.getValue(family, qualifier); 915 if (bytes == null) { 916 LOG.error("Row " + Bytes.toStringBinary(row) + ": missing " 917 + Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier)); 918 output.getCounter(Counts.UNREFERENCED).increment(1); 919 return; 920 } 921 if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) { 922 LOG.error("Row " + Bytes.toStringBinary(row) + ": " + Bytes.toStringBinary(family) + ":" 923 + Bytes.toStringBinary(qualifier) + " mismatch"); 924 output.getCounter(Counts.CORRUPT).increment(1); 925 return; 926 } 927 // If we fell through to here all verification checks have succeeded for the info 928 // record. 929 output.getCounter(Counts.REFERENCED).increment(1); 930 } 931 } 932 } 933 } 934 935 private static final AtomicLong counter = new AtomicLong(); 936 private static final int shift = 8; 937 938 private static long getSequence() { 939 long t = EnvironmentEdgeManager.currentTime(); 940 t <<= shift; 941 t |= (counter.getAndIncrement() % (1 << shift)); 942 return t; 943 } 944 945 private static byte[] rowKeyFromTargetURI(final String targetUri) 946 throws IOException, URISyntaxException, IllegalArgumentException { 947 final URI uri = new URI(targetUri); 948 // Ignore the scheme 949 // Reverse the components of the hostname 950 String reversedHost; 951 if (uri.getHost() != null) { 952 final String[] hostComponents = 953 Splitter.on('.').splitToStream(uri.getHost()).toArray(String[]::new); 954 final StringBuilder sb = new StringBuilder(); 955 for (int i = hostComponents.length - 1; i >= 0; i--) { 956 sb.append(hostComponents[i]); 957 if (i != 0) { 958 sb.append('.'); 959 } 960 } 961 reversedHost = sb.toString(); 962 } else { 963 throw new IllegalArgumentException("URI is missing host component"); 964 } 965 final ByteArrayOutputStream os = new ByteArrayOutputStream(); 966 os.write(reversedHost.getBytes(StandardCharsets.UTF_8)); 967 if (uri.getPort() >= 0) { 968 os.write(String.format(":%d", uri.getPort()).getBytes(StandardCharsets.UTF_8)); 969 } 970 os.write((byte) '|'); 971 if (uri.getPath() != null) { 972 os.write(uri.getPath().getBytes(StandardCharsets.UTF_8)); 973 } 974 if (uri.getQuery() != null) { 975 os.write(String.format("?%s", uri.getQuery()).getBytes(StandardCharsets.UTF_8)); 976 } 977 if (uri.getFragment() != null) { 978 os.write(String.format("#%s", uri.getFragment()).getBytes(StandardCharsets.UTF_8)); 979 } 980 if (os.size() > HConstants.MAX_ROW_LENGTH) { 981 throw new IllegalArgumentException( 982 "Key would be too large (length=" + os.size() + ", limit=" + HConstants.MAX_ROW_LENGTH); 983 } 984 return os.toByteArray(); 985 } 986 987 static final Pattern URL_PATTERN = Pattern.compile( 988 "\\b((https?|ftp|file)://|(www|ftp)\\.)" + "[\\-A-Z0-9+&@#/%?=~_|$!:,\\.;]*[A-Z0-9+&@#/%=~_|$]", 989 Pattern.CASE_INSENSITIVE); 990 991 private static Collection<String> extractUrls(byte[] content) { 992 final Set<String> list = new HashSet<>(); // uniques 993 final Matcher m = URL_PATTERN.matcher(new String(content, StandardCharsets.UTF_8)); 994 while (m.find()) { 995 list.add(m.group()); 996 } 997 return list; 998 } 999 1000}