001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import java.util.Arrays;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.Scan;
027import org.apache.hadoop.hbase.util.Bytes;
028import org.apache.hadoop.io.Writable;
029import org.apache.hadoop.io.WritableUtils;
030import org.apache.hadoop.mapreduce.InputSplit;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * A table split corresponds to a key range (low, high) and an optional scanner. All references to
037 * row below refer to the key of the row.
038 */
039@InterfaceAudience.Public
040public class TableSplit extends InputSplit implements Writable, Comparable<TableSplit> {
041
042  private static final Logger LOG = LoggerFactory.getLogger(TableSplit.class);
043
044  // should be < 0 (@see #readFields(DataInput))
045  // version 1 supports Scan data member
046  enum Version {
047    UNVERSIONED(0),
048    // Initial number we put on TableSplit when we introduced versioning.
049    INITIAL(-1),
050    // Added an encoded region name field for easier identification of split -> region
051    WITH_ENCODED_REGION_NAME(-2);
052
053    final int code;
054    static final Version[] byCode;
055    static {
056      byCode = Version.values();
057      for (int i = 0; i < byCode.length; i++) {
058        if (byCode[i].code != -1 * i) {
059          throw new AssertionError("Values in this enum should be descending by one");
060        }
061      }
062    }
063
064    Version(int code) {
065      this.code = code;
066    }
067
068    boolean atLeast(Version other) {
069      return code <= other.code;
070    }
071
072    static Version fromCode(int code) {
073      return byCode[code * -1];
074    }
075  }
076
077  private static final Version VERSION = Version.WITH_ENCODED_REGION_NAME;
078  private TableName tableName;
079  private byte[] startRow;
080  private byte[] endRow;
081  private String regionLocation;
082  private String encodedRegionName = "";
083
084  /**
085   * The scan object may be null but the serialized form of scan is never null or empty since we
086   * serialize the scan object with default values then. Having no scanner in TableSplit doesn't
087   * necessarily mean there is no scanner for mapreduce job, it just means that we do not need to
088   * set it for each split. For example, it is not required to have a scan object for
089   * {@link org.apache.hadoop.hbase.mapred.TableInputFormatBase} since we use the scan from the job
090   * conf and scanner is supposed to be same for all the splits of table.
091   */
092  private String scan = ""; // stores the serialized form of the Scan
093  private long length; // Contains estimation of region size in bytes
094
095  /** Default constructor. */
096  public TableSplit() {
097    this((TableName) null, null, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, "");
098  }
099
100  /**
101   * Creates a new instance while assigning all variables. Length of region is set to 0 Encoded name
102   * of the region is set to blank
103   * @param tableName The name of the current table.
104   * @param scan      The scan associated with this split.
105   * @param startRow  The start row of the split.
106   * @param endRow    The end row of the split.
107   * @param location  The location of the region.
108   */
109  public TableSplit(TableName tableName, Scan scan, byte[] startRow, byte[] endRow,
110    final String location) {
111    this(tableName, scan, startRow, endRow, location, 0L);
112  }
113
114  /**
115   * Creates a new instance while assigning all variables. Encoded name of region is set to blank
116   * @param tableName The name of the current table.
117   * @param scan      The scan associated with this split.
118   * @param startRow  The start row of the split.
119   * @param endRow    The end row of the split.
120   * @param location  The location of the region.
121   */
122  public TableSplit(TableName tableName, Scan scan, byte[] startRow, byte[] endRow,
123    final String location, long length) {
124    this(tableName, scan, startRow, endRow, location, "", length);
125  }
126
127  /**
128   * Creates a new instance while assigning all variables.
129   * @param tableName         The name of the current table.
130   * @param scan              The scan associated with this split.
131   * @param startRow          The start row of the split.
132   * @param endRow            The end row of the split.
133   * @param encodedRegionName The region ID.
134   * @param location          The location of the region.
135   */
136  public TableSplit(TableName tableName, Scan scan, byte[] startRow, byte[] endRow,
137    final String location, final String encodedRegionName, long length) {
138    this.tableName = tableName;
139    try {
140      this.scan = (null == scan) ? "" : TableMapReduceUtil.convertScanToString(scan);
141    } catch (IOException e) {
142      LOG.warn("Failed to convert Scan to String", e);
143    }
144    this.startRow = startRow;
145    this.endRow = endRow;
146    this.regionLocation = location;
147    this.encodedRegionName = encodedRegionName;
148    this.length = length;
149  }
150
151  /**
152   * Creates a new instance without a scanner. Length of region is set to 0
153   * @param tableName The name of the current table.
154   * @param startRow  The start row of the split.
155   * @param endRow    The end row of the split.
156   * @param location  The location of the region.
157   */
158  public TableSplit(TableName tableName, byte[] startRow, byte[] endRow, final String location) {
159    this(tableName, null, startRow, endRow, location);
160  }
161
162  /**
163   * Creates a new instance without a scanner.
164   * @param tableName The name of the current table.
165   * @param startRow  The start row of the split.
166   * @param endRow    The end row of the split.
167   * @param location  The location of the region.
168   * @param length    Size of region in bytes
169   */
170  public TableSplit(TableName tableName, byte[] startRow, byte[] endRow, final String location,
171    long length) {
172    this(tableName, null, startRow, endRow, location, length);
173  }
174
175  /**
176   * Returns a Scan object from the stored string representation.
177   * @return Returns a Scan object based on the stored scanner.
178   * @throws IOException throws IOException if deserialization fails
179   */
180  public Scan getScan() throws IOException {
181    return TableMapReduceUtil.convertStringToScan(this.scan);
182  }
183
184  /**
185   * Returns a scan string
186   * @return scan as string. Should be noted that this is not same as getScan().toString() because
187   *         Scan object will have the default values when empty scan string is deserialized. Thus,
188   *         getScan().toString() can never be empty
189   */
190  @InterfaceAudience.Private
191  public String getScanAsString() {
192    return this.scan;
193  }
194
195  /**
196   * Returns the table name converted to a byte array.
197   * @see #getTable()
198   * @return The table name.
199   */
200  public byte[] getTableName() {
201    return tableName.getName();
202  }
203
204  /**
205   * Returns the table name.
206   * @return The table name.
207   */
208  public TableName getTable() {
209    // It is ugly that usually to get a TableName, the method is called getTableName. We can't do
210    // that in here though because there was an existing getTableName in place already since
211    // deprecated.
212    return tableName;
213  }
214
215  /**
216   * Returns the start row.
217   * @return The start row.
218   */
219  public byte[] getStartRow() {
220    return startRow;
221  }
222
223  /**
224   * Returns the end row.
225   * @return The end row.
226   */
227  public byte[] getEndRow() {
228    return endRow;
229  }
230
231  /**
232   * Returns the region location.
233   * @return The region's location.
234   */
235  public String getRegionLocation() {
236    return regionLocation;
237  }
238
239  /**
240   * Returns the region's location as an array.
241   * @return The array containing the region location.
242   * @see org.apache.hadoop.mapreduce.InputSplit#getLocations()
243   */
244  @Override
245  public String[] getLocations() {
246    return new String[] { regionLocation };
247  }
248
249  /**
250   * Returns the region's encoded name.
251   * @return The region's encoded name.
252   */
253  public String getEncodedRegionName() {
254    return encodedRegionName;
255  }
256
257  /**
258   * Returns the length of the split.
259   * @return The length of the split.
260   * @see org.apache.hadoop.mapreduce.InputSplit#getLength()
261   */
262  @Override
263  public long getLength() {
264    return length;
265  }
266
267  /**
268   * Reads the values of each field.
269   * @param in The input to read from.
270   * @throws IOException When reading the input fails.
271   */
272  @Override
273  public void readFields(DataInput in) throws IOException {
274    Version version = Version.UNVERSIONED;
275    // TableSplit was not versioned in the beginning.
276    // In order to introduce it now, we make use of the fact
277    // that tableName was written with Bytes.writeByteArray,
278    // which encodes the array length as a vint which is >= 0.
279    // Hence if the vint is >= 0 we have an old version and the vint
280    // encodes the length of tableName.
281    // If < 0 we just read the version and the next vint is the length.
282    // @see Bytes#readByteArray(DataInput)
283    int len = WritableUtils.readVInt(in);
284    if (len < 0) {
285      // what we just read was the version
286      version = Version.fromCode(len);
287      len = WritableUtils.readVInt(in);
288    }
289    byte[] tableNameBytes = new byte[len];
290    in.readFully(tableNameBytes);
291    tableName = TableName.valueOf(tableNameBytes);
292    startRow = Bytes.readByteArray(in);
293    endRow = Bytes.readByteArray(in);
294    regionLocation = Bytes.toString(Bytes.readByteArray(in));
295    if (version.atLeast(Version.INITIAL)) {
296      scan = Bytes.toString(Bytes.readByteArray(in));
297    }
298    length = WritableUtils.readVLong(in);
299    if (version.atLeast(Version.WITH_ENCODED_REGION_NAME)) {
300      encodedRegionName = Bytes.toString(Bytes.readByteArray(in));
301    }
302  }
303
304  /**
305   * Writes the field values to the output.
306   * @param out The output to write to.
307   * @throws IOException When writing the values to the output fails.
308   */
309  @Override
310  public void write(DataOutput out) throws IOException {
311    WritableUtils.writeVInt(out, VERSION.code);
312    Bytes.writeByteArray(out, tableName.getName());
313    Bytes.writeByteArray(out, startRow);
314    Bytes.writeByteArray(out, endRow);
315    Bytes.writeByteArray(out, Bytes.toBytes(regionLocation));
316    Bytes.writeByteArray(out, Bytes.toBytes(scan));
317    WritableUtils.writeVLong(out, length);
318    Bytes.writeByteArray(out, Bytes.toBytes(encodedRegionName));
319  }
320
321  /**
322   * Returns the details about this instance as a string.
323   * @return The values of this instance as a string.
324   * @see java.lang.Object#toString()
325   */
326  @Override
327  public String toString() {
328    StringBuilder sb = new StringBuilder();
329    sb.append("Split(");
330    sb.append("tablename=").append(tableName);
331    // null scan input is represented by ""
332    String printScan = "";
333    if (!scan.equals("")) {
334      try {
335        // get the real scan here in toString, not the Base64 string
336        printScan = TableMapReduceUtil.convertStringToScan(scan).toString();
337      } catch (IOException e) {
338        printScan = "";
339      }
340      sb.append(", scan=").append(printScan);
341    }
342    sb.append(", startrow=").append(Bytes.toStringBinary(startRow));
343    sb.append(", endrow=").append(Bytes.toStringBinary(endRow));
344    sb.append(", regionLocation=").append(regionLocation);
345    sb.append(", regionname=").append(encodedRegionName);
346    sb.append(")");
347    return sb.toString();
348  }
349
350  /**
351   * Compares this split against the given one.
352   * @param split The split to compare to.
353   * @return The result of the comparison.
354   * @see java.lang.Comparable#compareTo(java.lang.Object)
355   */
356  @Override
357  public int compareTo(TableSplit split) {
358    // If The table name of the two splits is the same then compare start row
359    // otherwise compare based on table names
360    int tableNameComparison = getTable().compareTo(split.getTable());
361    return tableNameComparison != 0
362      ? tableNameComparison
363      : Bytes.compareTo(getStartRow(), split.getStartRow());
364  }
365
366  @Override
367  public boolean equals(Object o) {
368    if (o == null || !(o instanceof TableSplit)) {
369      return false;
370    }
371    return tableName.equals(((TableSplit) o).tableName)
372      && Bytes.equals(startRow, ((TableSplit) o).startRow)
373      && Bytes.equals(endRow, ((TableSplit) o).endRow)
374      && regionLocation.equals(((TableSplit) o).regionLocation);
375  }
376
377  @Override
378  public int hashCode() {
379    int result = tableName != null ? tableName.hashCode() : 0;
380    result = 31 * result + (scan != null ? scan.hashCode() : 0);
381    result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0);
382    result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0);
383    result = 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0);
384    result = 31 * result + (encodedRegionName != null ? encodedRegionName.hashCode() : 0);
385    return result;
386  }
387}