Class RoundRobinTableInputFormat

java.lang.Object
org.apache.hadoop.mapreduce.InputFormat<ImmutableBytesWritable,Result>
All Implemented Interfaces:
org.apache.hadoop.conf.Configurable

@Public public class RoundRobinTableInputFormat extends TableInputFormat
Process the return from super-class TableInputFormat (TIF) so as to undo any clumping of InputSplits around RegionServers. Spread splits broadly to distribute read-load over RegionServers in the cluster. The super-class TIF returns splits in hbase:meta table order. Adjacent or near-adjacent hbase:meta Regions can be hosted on the same RegionServer -- nothing prevents this. This hbase:maeta ordering of InputSplit placement can be lumpy making it so some RegionServers end up hosting lots of InputSplit scans while contemporaneously other RegionServers host few or none. This class does a pass over the return from the super-class to better spread the load. See the below helpful Flipkart blog post for a description and from where the base of this code comes from (with permission).
  • Field Details

    • hbaseRegionsizecalculatorEnableOriginalValue

    • HBASE_REGIONSIZECALCULATOR_ENABLE

      Boolean config for whether superclass should produce InputSplits with 'lengths'. If true, TIF will query every RegionServer to get the 'size' of all involved Regions and this 'size' will be used the the InputSplit length. If false, we skip this query and the super-classes returned InputSplits will have lenghths of zero. This override will set the flag to false. All returned lengths will be zero. Makes it so sorting on 'length' becomes a noop. The sort returned by this override will prevail. Thats what we want.
  • Constructor Details

  • Method Details

    • getSplits

      public List<org.apache.hadoop.mapreduce.InputSplit> getSplits(org.apache.hadoop.mapreduce.JobContext context) throws IOException
      Description copied from class: TableInputFormat
      Calculates the splits that will serve as input for the map tasks. The number of splits matches the number of regions in a table. Splits are shuffled if required.
      Overrides:
      getSplits in class TableInputFormat
      Parameters:
      context - The current job context.
      Returns:
      The list of input splits.
      Throws:
      IOException - When creating the list of splits fails.
      See Also:
      • InputFormat.getSplits(org.apache.hadoop.mapreduce.JobContext)
    • getSuperSplits

      List<org.apache.hadoop.mapreduce.InputSplit> getSuperSplits(org.apache.hadoop.mapreduce.JobContext context) throws IOException
      Call super-classes' getSplits. Have it out here as its own method so can be overridden.
      Throws:
      IOException
    • roundRobin

      List<org.apache.hadoop.mapreduce.InputSplit> roundRobin(List<org.apache.hadoop.mapreduce.InputSplit> inputs) throws IOException
      Spread the splits list so as to avoid clumping on RegionServers. Order splits so every server gets one split before a server gets a second, and so on; i.e. round-robin the splits amongst the servers in the cluster.
      Throws:
      IOException
    • configure

      void configure()
      Adds a configuration to the Context disabling remote rpc'ing to figure Region size when calculating InputSplits. See up in super-class TIF where we rpc to every server to find the size of all involved Regions. Here we disable this super-class action. This means InputSplits will have a length of zero. If all InputSplits have zero-length InputSplits, the ordering done in here will 'pass-through' Hadoop's length-first sort. The superclass TIF will ask every node for the current size of each of the participating Table Regions. It does this because it wants to schedule the biggest Regions first (This fixation comes of hadoop itself -- see JobSubmitter where it sorts inputs by size). This extra diligence takes time and is of no utility in this RRTIF where spread is of more import than size-first. Also, if a rolling restart is happening when we go to launch the job, the job launch may fail because the request for Region size fails -- even after retries -- because rolled RegionServer may take a while to come online: e.g. it takes java 90 seconds to allocate a 160G. RegionServer is offline during this time. The job launch will fail with 'Connection rejected'. So, we set 'hbase.regionsizecalculator.enable' to false here in RRTIF.
      See Also:
    • unconfigure

      void unconfigure()
      See Also:
    • main

      public static void main(String[] args) throws IOException
      Pass table name as argument. Set the zk ensemble to use with the System property 'hbase.zookeeper.quorum'
      Throws:
      IOException