001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.text.MessageFormat;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import org.apache.hadoop.hbase.HRegionInfo;
028import org.apache.hadoop.hbase.HRegionLocation;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.ConnectionFactory;
032import org.apache.hadoop.hbase.client.RegionLocator;
033import org.apache.hadoop.hbase.client.Result;
034import org.apache.hadoop.hbase.client.Scan;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.util.Pair;
039import org.apache.hadoop.mapreduce.InputFormat;
040import org.apache.hadoop.mapreduce.InputSplit;
041import org.apache.hadoop.mapreduce.JobContext;
042import org.apache.hadoop.mapreduce.RecordReader;
043import org.apache.hadoop.mapreduce.TaskAttemptContext;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * A base for {@link MultiTableInputFormat}s. Receives a list of {@link Scan} instances that define
050 * the input tables and filters etc. Subclasses may use other TableRecordReader implementations.
051 */
052@InterfaceAudience.Public
053public abstract class MultiTableInputFormatBase
054  extends InputFormat<ImmutableBytesWritable, Result> {
055
056  private static final Logger LOG = LoggerFactory.getLogger(MultiTableInputFormatBase.class);
057
058  /** Holds the set of scans used to define the input. */
059  private List<Scan> scans;
060
061  /** The reader scanning the table, can be a custom one. */
062  private TableRecordReader tableRecordReader = null;
063
064  /**
065   * Builds a TableRecordReader. If no TableRecordReader was provided, uses the default.
066   * @param split   The split to work with.
067   * @param context The current context.
068   * @return The newly created record reader.
069   * @throws IOException          When creating the reader fails.
070   * @throws InterruptedException when record reader initialization fails
071   * @see InputFormat#createRecordReader(InputSplit, TaskAttemptContext)
072   */
073  @Override
074  public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split,
075    TaskAttemptContext context) throws IOException, InterruptedException {
076    TableSplit tSplit = (TableSplit) split;
077    LOG.info(MessageFormat.format("Input split length: {0} bytes.", tSplit.getLength()));
078
079    if (tSplit.getTable() == null) {
080      throw new IOException("Cannot create a record reader because of a"
081        + " previous error. Please look at the previous logs lines from"
082        + " the task's full log for more details.");
083    }
084    final Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
085    Table table = connection.getTable(tSplit.getTable());
086
087    if (this.tableRecordReader == null) {
088      this.tableRecordReader = new TableRecordReader();
089    }
090    final TableRecordReader trr = this.tableRecordReader;
091
092    try {
093      Scan sc = tSplit.getScan();
094      sc.setStartRow(tSplit.getStartRow());
095      sc.setStopRow(tSplit.getEndRow());
096      trr.setScan(sc);
097      trr.setTable(table);
098      return new RecordReader<ImmutableBytesWritable, Result>() {
099
100        @Override
101        public void close() throws IOException {
102          trr.close();
103          connection.close();
104        }
105
106        @Override
107        public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
108          return trr.getCurrentKey();
109        }
110
111        @Override
112        public Result getCurrentValue() throws IOException, InterruptedException {
113          return trr.getCurrentValue();
114        }
115
116        @Override
117        public float getProgress() throws IOException, InterruptedException {
118          return trr.getProgress();
119        }
120
121        @Override
122        public void initialize(InputSplit inputsplit, TaskAttemptContext context)
123          throws IOException, InterruptedException {
124          trr.initialize(inputsplit, context);
125        }
126
127        @Override
128        public boolean nextKeyValue() throws IOException, InterruptedException {
129          return trr.nextKeyValue();
130        }
131      };
132    } catch (IOException ioe) {
133      // If there is an exception make sure that all
134      // resources are closed and released.
135      trr.close();
136      connection.close();
137      throw ioe;
138    }
139  }
140
141  /**
142   * Calculates the splits that will serve as input for the map tasks. The number of splits matches
143   * the number of regions in a table.
144   * @param context The current job context.
145   * @return The list of input splits.
146   * @throws IOException When creating the list of splits fails.
147   * @see InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)
148   */
149  @Override
150  public List<InputSplit> getSplits(JobContext context) throws IOException {
151    if (scans.isEmpty()) {
152      throw new IOException("No scans were provided.");
153    }
154
155    Map<TableName, List<Scan>> tableMaps = new HashMap<>();
156    for (Scan scan : scans) {
157      byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
158      if (tableNameBytes == null) throw new IOException("A scan object did not have a table name");
159
160      TableName tableName = TableName.valueOf(tableNameBytes);
161
162      List<Scan> scanList = tableMaps.get(tableName);
163      if (scanList == null) {
164        scanList = new ArrayList<>();
165        tableMaps.put(tableName, scanList);
166      }
167      scanList.add(scan);
168    }
169
170    List<InputSplit> splits = new ArrayList<>();
171    Iterator iter = tableMaps.entrySet().iterator();
172    // Make a single Connection to the Cluster and use it across all tables.
173    try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration())) {
174      while (iter.hasNext()) {
175        Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
176        TableName tableName = entry.getKey();
177        List<Scan> scanList = entry.getValue();
178        try (Table table = conn.getTable(tableName);
179          RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
180          RegionSizeCalculator sizeCalculator =
181            new RegionSizeCalculator(regionLocator, conn.getAdmin());
182          Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
183          for (Scan scan : scanList) {
184            if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
185              throw new IOException(
186                "Expecting at least one region for table : " + tableName.getNameAsString());
187            }
188            int count = 0;
189
190            byte[] startRow = scan.getStartRow();
191            byte[] stopRow = scan.getStopRow();
192
193            for (int i = 0; i < keys.getFirst().length; i++) {
194              if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
195                continue;
196              }
197
198              if (
199                (startRow.length == 0 || keys.getSecond()[i].length == 0
200                  || Bytes.compareTo(startRow, keys.getSecond()[i]) < 0)
201                  && (stopRow.length == 0 || Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)
202              ) {
203                byte[] splitStart =
204                  startRow.length == 0 || Bytes.compareTo(keys.getFirst()[i], startRow) >= 0
205                    ? keys.getFirst()[i]
206                    : startRow;
207                byte[] splitStop =
208                  (stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0)
209                    && keys.getSecond()[i].length > 0 ? keys.getSecond()[i] : stopRow;
210
211                HRegionLocation hregionLocation =
212                  regionLocator.getRegionLocation(keys.getFirst()[i], false);
213                String regionHostname = hregionLocation.getHostname();
214                HRegionInfo regionInfo = hregionLocation.getRegionInfo();
215                String encodedRegionName = regionInfo.getEncodedName();
216                long regionSize = sizeCalculator.getRegionSize(regionInfo.getRegionName());
217
218                TableSplit split = new TableSplit(table.getName(), scan, splitStart, splitStop,
219                  regionHostname, encodedRegionName, regionSize);
220
221                splits.add(split);
222
223                if (LOG.isDebugEnabled()) {
224                  LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
225                }
226              }
227            }
228          }
229        }
230      }
231    }
232
233    return splits;
234  }
235
236  /**
237   * Test if the given region is to be included in the InputSplit while splitting the regions of a
238   * table.
239   * <p>
240   * This optimization is effective when there is a specific reasoning to exclude an entire region
241   * from the M-R job, (and hence, not contributing to the InputSplit), given the start and end keys
242   * of the same. <br>
243   * Useful when we need to remember the last-processed top record and revisit the [last, current)
244   * interval for M-R processing, continuously. In addition to reducing InputSplits, reduces the
245   * load on the region server as well, due to the ordering of the keys. <br>
246   * <br>
247   * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region.
248   * <br>
249   * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no
250   * region is excluded( i.e. all regions are included).
251   * @param startKey Start key of the region
252   * @param endKey   End key of the region
253   * @return true, if this region needs to be included as part of the input (default).
254   */
255  protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
256    return true;
257  }
258
259  /**
260   * Allows subclasses to get the list of {@link Scan} objects.
261   */
262  protected List<Scan> getScans() {
263    return this.scans;
264  }
265
266  /**
267   * Allows subclasses to set the list of {@link Scan} objects.
268   * @param scans The list of {@link Scan} used to define the input
269   */
270  protected void setScans(List<Scan> scans) {
271    this.scans = scans;
272  }
273
274  /**
275   * Allows subclasses to set the {@link TableRecordReader}.
276   * @param tableRecordReader A different {@link TableRecordReader} implementation.
277   */
278  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
279    this.tableRecordReader = tableRecordReader;
280  }
281}