001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; 021 022import java.io.IOException; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.DoNotRetryIOException; 025import org.apache.hadoop.hbase.client.Result; 026import org.apache.hadoop.hbase.client.ResultScanner; 027import org.apache.hadoop.hbase.client.Scan; 028import org.apache.hadoop.hbase.client.Table; 029import org.apache.hadoop.hbase.filter.Filter; 030import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 031import org.apache.hadoop.hbase.mapreduce.TableInputFormat; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.apache.hadoop.util.StringUtils; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * Iterate over an HBase table data, return (Text, RowResult) pairs 041 */ 042@InterfaceAudience.Public 043public class TableRecordReaderImpl { 044 private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class); 045 046 private byte[] startRow; 047 private byte[] endRow; 048 private byte[] lastSuccessfulRow; 049 private Filter trrRowFilter; 050 private ResultScanner scanner; 051 private Table htable; 052 private byte[][] trrInputColumns; 053 private long timestamp; 054 private int rowcount; 055 private boolean logScannerActivity = false; 056 private int logPerRowCount = 100; 057 058 /** 059 * Restart from survivable exceptions by creating a new scanner. 060 */ 061 public void restart(byte[] firstRow) throws IOException { 062 Scan currentScan; 063 if ((endRow != null) && (endRow.length > 0)) { 064 if (trrRowFilter != null) { 065 Scan scan = new Scan(firstRow, endRow); 066 TableInputFormat.addColumns(scan, trrInputColumns); 067 scan.setFilter(trrRowFilter); 068 scan.setCacheBlocks(false); 069 this.scanner = this.htable.getScanner(scan); 070 currentScan = scan; 071 } else { 072 LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) + ", endRow: " 073 + Bytes.toStringBinary(endRow)); 074 Scan scan = new Scan(firstRow, endRow); 075 TableInputFormat.addColumns(scan, trrInputColumns); 076 this.scanner = this.htable.getScanner(scan); 077 currentScan = scan; 078 } 079 } else { 080 LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) + ", no endRow"); 081 082 Scan scan = new Scan(firstRow); 083 TableInputFormat.addColumns(scan, trrInputColumns); 084 scan.setFilter(trrRowFilter); 085 this.scanner = this.htable.getScanner(scan); 086 currentScan = scan; 087 } 088 if (logScannerActivity) { 089 LOG.info("Current scan=" + currentScan.toString()); 090 timestamp = EnvironmentEdgeManager.currentTime(); 091 rowcount = 0; 092 } 093 } 094 095 /** 096 * Build the scanner. Not done in constructor to allow for extension. 097 */ 098 public void init() throws IOException { 099 restart(startRow); 100 } 101 102 byte[] getStartRow() { 103 return this.startRow; 104 } 105 106 /** 107 * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan. 108 */ 109 public void setHTable(Table htable) { 110 Configuration conf = htable.getConfiguration(); 111 logScannerActivity = conf.getBoolean( 112 "hbase.client.log.scanner.activity" /* ScannerCallable.LOG_SCANNER_ACTIVITY */, false); 113 logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); 114 this.htable = htable; 115 } 116 117 /** 118 * @param inputColumns the columns to be placed in {@link Result}. 119 */ 120 public void setInputColumns(final byte[][] inputColumns) { 121 this.trrInputColumns = inputColumns; 122 } 123 124 /** 125 * @param startRow the first row in the split 126 */ 127 public void setStartRow(final byte[] startRow) { 128 this.startRow = startRow; 129 } 130 131 /** 132 * @param endRow the last row in the split 133 */ 134 public void setEndRow(final byte[] endRow) { 135 this.endRow = endRow; 136 } 137 138 /** 139 * @param rowFilter the {@link Filter} to be used. 140 */ 141 public void setRowFilter(Filter rowFilter) { 142 this.trrRowFilter = rowFilter; 143 } 144 145 public void close() { 146 if (this.scanner != null) { 147 this.scanner.close(); 148 } 149 try { 150 this.htable.close(); 151 } catch (IOException ioe) { 152 LOG.warn("Error closing table", ioe); 153 } 154 } 155 156 /** 157 * @see org.apache.hadoop.mapred.RecordReader#createKey() 158 */ 159 public ImmutableBytesWritable createKey() { 160 return new ImmutableBytesWritable(); 161 } 162 163 /** 164 * @see org.apache.hadoop.mapred.RecordReader#createValue() 165 */ 166 public Result createValue() { 167 return new Result(); 168 } 169 170 public long getPos() { 171 // This should be the ordinal tuple in the range; 172 // not clear how to calculate... 173 return 0; 174 } 175 176 public float getProgress() { 177 // Depends on the total number of tuples and getPos 178 return 0; 179 } 180 181 /** 182 * @param key HStoreKey as input key. 183 * @param value MapWritable as input value 184 * @return true if there was more data 185 */ 186 public boolean next(ImmutableBytesWritable key, Result value) throws IOException { 187 Result result; 188 try { 189 try { 190 result = this.scanner.next(); 191 if (logScannerActivity) { 192 rowcount++; 193 if (rowcount >= logPerRowCount) { 194 long now = EnvironmentEdgeManager.currentTime(); 195 LOG.info("Mapper took " + (now - timestamp) + "ms to process " + rowcount + " rows"); 196 timestamp = now; 197 rowcount = 0; 198 } 199 } 200 } catch (IOException e) { 201 // do not retry if the exception tells us not to do so 202 if (e instanceof DoNotRetryIOException) { 203 throw e; 204 } 205 // try to handle all other IOExceptions by restarting 206 // the scanner, if the second call fails, it will be rethrown 207 LOG.debug("recovered from " + StringUtils.stringifyException(e)); 208 if (lastSuccessfulRow == null) { 209 LOG.warn("We are restarting the first next() invocation," 210 + " if your mapper has restarted a few other times like this" 211 + " then you should consider killing this job and investigate" 212 + " why it's taking so long."); 213 } 214 if (lastSuccessfulRow == null) { 215 restart(startRow); 216 } else { 217 restart(lastSuccessfulRow); 218 this.scanner.next(); // skip presumed already mapped row 219 } 220 result = this.scanner.next(); 221 } 222 223 if (result != null && result.size() > 0) { 224 key.set(result.getRow()); 225 lastSuccessfulRow = key.get(); 226 value.copyFrom(result); 227 return true; 228 } 229 return false; 230 } catch (IOException ioe) { 231 if (logScannerActivity) { 232 long now = EnvironmentEdgeManager.currentTime(); 233 LOG.info("Mapper took " + (now - timestamp) + "ms to process " + rowcount + " rows"); 234 LOG.info(ioe.toString(), ioe); 235 String lastRow = 236 lastSuccessfulRow == null ? "null" : Bytes.toStringBinary(lastSuccessfulRow); 237 LOG.info("lastSuccessfulRow=" + lastRow); 238 } 239 throw ioe; 240 } 241 } 242}