001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
021import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
022import static org.apache.hadoop.hbase.thrift.Constants.COALESCE_INC_KEY;
023import static org.apache.hadoop.hbase.util.Bytes.getBytes;
024
025import java.io.IOException;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.TreeMap;
033import java.util.concurrent.TimeUnit;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.CellBuilder;
037import org.apache.hadoop.hbase.CellBuilderFactory;
038import org.apache.hadoop.hbase.CellBuilderType;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.DoNotRetryIOException;
041import org.apache.hadoop.hbase.HColumnDescriptor;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.HRegionLocation;
044import org.apache.hadoop.hbase.HTableDescriptor;
045import org.apache.hadoop.hbase.KeyValue;
046import org.apache.hadoop.hbase.MetaTableAccessor;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.TableNotFoundException;
050import org.apache.hadoop.hbase.client.Append;
051import org.apache.hadoop.hbase.client.Delete;
052import org.apache.hadoop.hbase.client.Durability;
053import org.apache.hadoop.hbase.client.Get;
054import org.apache.hadoop.hbase.client.Increment;
055import org.apache.hadoop.hbase.client.OperationWithAttributes;
056import org.apache.hadoop.hbase.client.Put;
057import org.apache.hadoop.hbase.client.RegionInfo;
058import org.apache.hadoop.hbase.client.RegionLocator;
059import org.apache.hadoop.hbase.client.Result;
060import org.apache.hadoop.hbase.client.ResultScanner;
061import org.apache.hadoop.hbase.client.Scan;
062import org.apache.hadoop.hbase.client.Table;
063import org.apache.hadoop.hbase.filter.Filter;
064import org.apache.hadoop.hbase.filter.ParseFilter;
065import org.apache.hadoop.hbase.filter.PrefixFilter;
066import org.apache.hadoop.hbase.filter.WhileMatchFilter;
067import org.apache.hadoop.hbase.security.UserProvider;
068import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
069import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
070import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
071import org.apache.hadoop.hbase.thrift.generated.Hbase;
072import org.apache.hadoop.hbase.thrift.generated.IOError;
073import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
074import org.apache.hadoop.hbase.thrift.generated.Mutation;
075import org.apache.hadoop.hbase.thrift.generated.TAppend;
076import org.apache.hadoop.hbase.thrift.generated.TCell;
077import org.apache.hadoop.hbase.thrift.generated.TIncrement;
078import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
079import org.apache.hadoop.hbase.thrift.generated.TRowResult;
080import org.apache.hadoop.hbase.thrift.generated.TScan;
081import org.apache.hadoop.hbase.thrift.generated.TThriftServerType;
082import org.apache.hadoop.hbase.util.Bytes;
083import org.apache.thrift.TException;
084import org.apache.yetus.audience.InterfaceAudience;
085import org.slf4j.Logger;
086import org.slf4j.LoggerFactory;
087
088import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
089import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
090import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
091
092/**
093 * The HBaseServiceHandler is a glue object that connects Thrift RPC calls to the HBase client API
094 * primarily defined in the Admin and Table objects.
095 */
096@InterfaceAudience.Private
097@SuppressWarnings("deprecation")
098public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements Hbase.Iface {
099  private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class);
100
101  public static final int HREGION_VERSION = 1;
102
103  // nextScannerId and scannerMap are used to manage scanner state
104  private int nextScannerId = 0;
105  private Cache<Integer, ResultScannerWrapper> scannerMap;
106  IncrementCoalescer coalescer;
107
108  /**
109   * Returns a list of all the column families for a given Table.
110   * @param table table
111   */
112  byte[][] getAllColumns(Table table) throws IOException {
113    HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
114    byte[][] columns = new byte[cds.length][];
115    for (int i = 0; i < cds.length; i++) {
116      columns[i] = Bytes.add(cds[i].getName(), KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
117    }
118    return columns;
119  }
120
121  /**
122   * Assigns a unique ID to the scanner and adds the mapping to an internal hash-map.
123   * @param scanner the {@link ResultScanner} to add
124   * @return integer scanner id
125   */
126  protected synchronized int addScanner(ResultScanner scanner, boolean sortColumns) {
127    int id = nextScannerId++;
128    ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
129    scannerMap.put(id, resultScannerWrapper);
130    return id;
131  }
132
133  /**
134   * Returns the scanner associated with the specified ID.
135   * @param id the ID of the scanner to get
136   * @return a Scanner, or null if ID was invalid.
137   */
138  private synchronized ResultScannerWrapper getScanner(int id) {
139    return scannerMap.getIfPresent(id);
140  }
141
142  /**
143   * Removes the scanner associated with the specified ID from the internal id-&gt;scanner hash-map.
144   * @param id the ID of the scanner to remove
145   */
146  private synchronized void removeScanner(int id) {
147    scannerMap.invalidate(id);
148  }
149
150  protected ThriftHBaseServiceHandler(final Configuration c, final UserProvider userProvider)
151    throws IOException {
152    super(c, userProvider);
153    long cacheTimeout =
154      c.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)
155        * 2;
156
157    scannerMap =
158      CacheBuilder.newBuilder().expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS).build();
159
160    this.coalescer = new IncrementCoalescer(this);
161  }
162
163  @Override
164  public void enableTable(ByteBuffer tableName) throws IOError {
165    try {
166      getAdmin().enableTable(getTableName(tableName));
167    } catch (IOException e) {
168      LOG.warn(e.getMessage(), e);
169      throw getIOError(e);
170    }
171  }
172
173  @Override
174  public void disableTable(ByteBuffer tableName) throws IOError {
175    try {
176      getAdmin().disableTable(getTableName(tableName));
177    } catch (IOException e) {
178      LOG.warn(e.getMessage(), e);
179      throw getIOError(e);
180    }
181  }
182
183  @Override
184  public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
185    try {
186      return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
187    } catch (IOException e) {
188      LOG.warn(e.getMessage(), e);
189      throw getIOError(e);
190    }
191  }
192
193  @Override
194  public Map<ByteBuffer, Boolean> getTableNamesWithIsTableEnabled() throws IOError {
195    try {
196      HashMap<ByteBuffer, Boolean> tables = new HashMap<>();
197      for (ByteBuffer tableName : this.getTableNames()) {
198        tables.put(tableName, this.isTableEnabled(tableName));
199      }
200      return tables;
201    } catch (IOError e) {
202      LOG.warn(e.getMessage(), e);
203      throw getIOError(e);
204    }
205  }
206
207  // ThriftServerRunner.compact should be deprecated and replaced with methods specific to
208  // table and region.
209  @Override
210  public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
211    try {
212      try {
213        getAdmin().compactRegion(getBytes(tableNameOrRegionName));
214      } catch (IllegalArgumentException e) {
215        // Invalid region, try table
216        getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
217      }
218    } catch (IOException e) {
219      LOG.warn(e.getMessage(), e);
220      throw getIOError(e);
221    }
222  }
223
224  // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific
225  // to table and region.
226  @Override
227  public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
228    try {
229      try {
230        getAdmin().compactRegion(getBytes(tableNameOrRegionName));
231      } catch (IllegalArgumentException e) {
232        // Invalid region, try table
233        getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
234      }
235    } catch (IOException e) {
236      LOG.warn(e.getMessage(), e);
237      throw getIOError(e);
238    }
239  }
240
241  @Override
242  public List<ByteBuffer> getTableNames() throws IOError {
243    try {
244      TableName[] tableNames = this.getAdmin().listTableNames();
245      ArrayList<ByteBuffer> list = new ArrayList<>(tableNames.length);
246      for (TableName tableName : tableNames) {
247        list.add(ByteBuffer.wrap(tableName.getName()));
248      }
249      return list;
250    } catch (IOException e) {
251      LOG.warn(e.getMessage(), e);
252      throw getIOError(e);
253    }
254  }
255
256  /**
257   * Returns the list of regions in the given table, or an empty list if the table does not exist
258   */
259  @Override
260  public List<TRegionInfo> getTableRegions(ByteBuffer tableName) throws IOError {
261    try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
262      List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
263      List<TRegionInfo> results = new ArrayList<>(regionLocations.size());
264      for (HRegionLocation regionLocation : regionLocations) {
265        RegionInfo info = regionLocation.getRegionInfo();
266        ServerName serverName = regionLocation.getServerName();
267        TRegionInfo region = new TRegionInfo();
268        region.serverName = ByteBuffer.wrap(Bytes.toBytes(serverName.getHostname()));
269        region.port = serverName.getPort();
270        region.startKey = ByteBuffer.wrap(info.getStartKey());
271        region.endKey = ByteBuffer.wrap(info.getEndKey());
272        region.id = info.getRegionId();
273        region.name = ByteBuffer.wrap(info.getRegionName());
274        region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used
275        results.add(region);
276      }
277      return results;
278    } catch (TableNotFoundException e) {
279      // Return empty list for non-existing table
280      return Collections.emptyList();
281    } catch (IOException e) {
282      LOG.warn(e.getMessage(), e);
283      throw getIOError(e);
284    }
285  }
286
287  @Override
288  public List<TCell> get(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
289    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
290    byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
291    if (famAndQf.length == 1) {
292      return get(tableName, row, famAndQf[0], null, attributes);
293    }
294    if (famAndQf.length == 2) {
295      return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
296    }
297    throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
298  }
299
300  /**
301   * Note: this internal interface is slightly different from public APIs in regard to handling of
302   * the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather, we
303   * respect qual == null as a request for the entire column family. The caller (
304   * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
305   * column is parse like normal.
306   */
307  protected List<TCell> get(ByteBuffer tableName, ByteBuffer row, byte[] family, byte[] qualifier,
308    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
309    Table table = null;
310    try {
311      table = getTable(tableName);
312      Get get = new Get(getBytes(row));
313      addAttributes(get, attributes);
314      if (qualifier == null) {
315        get.addFamily(family);
316      } else {
317        get.addColumn(family, qualifier);
318      }
319      Result result = table.get(get);
320      return ThriftUtilities.cellFromHBase(result.rawCells());
321    } catch (IOException e) {
322      LOG.warn(e.getMessage(), e);
323      throw getIOError(e);
324    } finally {
325      closeTable(table);
326    }
327  }
328
329  @Override
330  public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
331    int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
332    byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
333    if (famAndQf.length == 1) {
334      return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
335    }
336    if (famAndQf.length == 2) {
337      return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
338    }
339    throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
340
341  }
342
343  /**
344   * Note: this public interface is slightly different from public Java APIs in regard to handling
345   * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather, we
346   * respect qual == null as a request for the entire column family. If you want to access the
347   * entire column family, use {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a
348   * {@code column} value that lacks a {@code ':'}.
349   */
350  public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family, byte[] qualifier,
351    int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
352
353    Table table = null;
354    try {
355      table = getTable(tableName);
356      Get get = new Get(getBytes(row));
357      addAttributes(get, attributes);
358      if (null == qualifier) {
359        get.addFamily(family);
360      } else {
361        get.addColumn(family, qualifier);
362      }
363      get.setMaxVersions(numVersions);
364      Result result = table.get(get);
365      return ThriftUtilities.cellFromHBase(result.rawCells());
366    } catch (IOException e) {
367      LOG.warn(e.getMessage(), e);
368      throw getIOError(e);
369    } finally {
370      closeTable(table);
371    }
372  }
373
374  @Override
375  public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
376    long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
377    byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
378    if (famAndQf.length == 1) {
379      return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
380    }
381    if (famAndQf.length == 2) {
382      return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions, attributes);
383    }
384    throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
385  }
386
387  /**
388   * Note: this internal interface is slightly different from public APIs in regard to handling of
389   * the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather, we
390   * respect qual == null as a request for the entire column family. The caller (
391   * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS consistent
392   * in that the column is parse like normal.
393   */
394  protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
395    byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
396    throws IOError {
397
398    Table table = null;
399    try {
400      table = getTable(tableName);
401      Get get = new Get(getBytes(row));
402      addAttributes(get, attributes);
403      if (null == qualifier) {
404        get.addFamily(family);
405      } else {
406        get.addColumn(family, qualifier);
407      }
408      get.setTimeRange(0, timestamp);
409      get.setMaxVersions(numVersions);
410      Result result = table.get(get);
411      return ThriftUtilities.cellFromHBase(result.rawCells());
412    } catch (IOException e) {
413      LOG.warn(e.getMessage(), e);
414      throw getIOError(e);
415    } finally {
416      closeTable(table);
417    }
418  }
419
420  @Override
421  public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
422    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
423    return getRowWithColumnsTs(tableName, row, null, HConstants.LATEST_TIMESTAMP, attributes);
424  }
425
426  @Override
427  public List<TRowResult> getRowWithColumns(ByteBuffer tableName, ByteBuffer row,
428    List<ByteBuffer> columns, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
429    return getRowWithColumnsTs(tableName, row, columns, HConstants.LATEST_TIMESTAMP, attributes);
430  }
431
432  @Override
433  public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp,
434    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
435    return getRowWithColumnsTs(tableName, row, null, timestamp, attributes);
436  }
437
438  @Override
439  public List<TRowResult> getRowWithColumnsTs(ByteBuffer tableName, ByteBuffer row,
440    List<ByteBuffer> columns, long timestamp, Map<ByteBuffer, ByteBuffer> attributes)
441    throws IOError {
442
443    Table table = null;
444    try {
445      table = getTable(tableName);
446      if (columns == null) {
447        Get get = new Get(getBytes(row));
448        addAttributes(get, attributes);
449        get.setTimeRange(0, timestamp);
450        Result result = table.get(get);
451        return ThriftUtilities.rowResultFromHBase(result);
452      }
453      Get get = new Get(getBytes(row));
454      addAttributes(get, attributes);
455      for (ByteBuffer column : columns) {
456        byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
457        if (famAndQf.length == 1) {
458          get.addFamily(famAndQf[0]);
459        } else {
460          get.addColumn(famAndQf[0], famAndQf[1]);
461        }
462      }
463      get.setTimeRange(0, timestamp);
464      Result result = table.get(get);
465      return ThriftUtilities.rowResultFromHBase(result);
466    } catch (IOException e) {
467      LOG.warn(e.getMessage(), e);
468      throw getIOError(e);
469    } finally {
470      closeTable(table);
471    }
472  }
473
474  @Override
475  public List<TRowResult> getRows(ByteBuffer tableName, List<ByteBuffer> rows,
476    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
477    return getRowsWithColumnsTs(tableName, rows, null, HConstants.LATEST_TIMESTAMP, attributes);
478  }
479
480  @Override
481  public List<TRowResult> getRowsWithColumns(ByteBuffer tableName, List<ByteBuffer> rows,
482    List<ByteBuffer> columns, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
483    return getRowsWithColumnsTs(tableName, rows, columns, HConstants.LATEST_TIMESTAMP, attributes);
484  }
485
486  @Override
487  public List<TRowResult> getRowsTs(ByteBuffer tableName, List<ByteBuffer> rows, long timestamp,
488    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
489    return getRowsWithColumnsTs(tableName, rows, null, timestamp, attributes);
490  }
491
492  @Override
493  public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName, List<ByteBuffer> rows,
494    List<ByteBuffer> columns, long timestamp, Map<ByteBuffer, ByteBuffer> attributes)
495    throws IOError {
496
497    Table table = null;
498    try {
499      List<Get> gets = new ArrayList<>(rows.size());
500      table = getTable(tableName);
501      if (metrics != null) {
502        metrics.incNumRowKeysInBatchGet(rows.size());
503      }
504      for (ByteBuffer row : rows) {
505        Get get = new Get(getBytes(row));
506        addAttributes(get, attributes);
507        if (columns != null) {
508
509          for (ByteBuffer column : columns) {
510            byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
511            if (famAndQf.length == 1) {
512              get.addFamily(famAndQf[0]);
513            } else {
514              get.addColumn(famAndQf[0], famAndQf[1]);
515            }
516          }
517        }
518        get.setTimeRange(0, timestamp);
519        gets.add(get);
520      }
521      Result[] result = table.get(gets);
522      return ThriftUtilities.rowResultFromHBase(result);
523    } catch (IOException e) {
524      LOG.warn(e.getMessage(), e);
525      throw getIOError(e);
526    } finally {
527      closeTable(table);
528    }
529  }
530
531  @Override
532  public void deleteAll(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
533    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
534    deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP, attributes);
535  }
536
537  @Override
538  public void deleteAllTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long timestamp,
539    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
540    Table table = null;
541    try {
542      table = getTable(tableName);
543      Delete delete = new Delete(getBytes(row));
544      addAttributes(delete, attributes);
545      byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
546      if (famAndQf.length == 1) {
547        delete.addFamily(famAndQf[0], timestamp);
548      } else {
549        delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
550      }
551      table.delete(delete);
552
553    } catch (IOException e) {
554      LOG.warn(e.getMessage(), e);
555      throw getIOError(e);
556    } finally {
557      closeTable(table);
558    }
559  }
560
561  @Override
562  public void deleteAllRow(ByteBuffer tableName, ByteBuffer row,
563    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
564    deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
565  }
566
567  @Override
568  public void deleteAllRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp,
569    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
570    Table table = null;
571    try {
572      table = getTable(tableName);
573      Delete delete = new Delete(getBytes(row), timestamp);
574      addAttributes(delete, attributes);
575      table.delete(delete);
576    } catch (IOException e) {
577      LOG.warn(e.getMessage(), e);
578      throw getIOError(e);
579    } finally {
580      closeTable(table);
581    }
582  }
583
584  @Override
585  public void createTable(ByteBuffer in_tableName, List<ColumnDescriptor> columnFamilies)
586    throws IOError, IllegalArgument, AlreadyExists {
587    TableName tableName = getTableName(in_tableName);
588    try {
589      if (getAdmin().tableExists(tableName)) {
590        throw new AlreadyExists("table name already in use");
591      }
592      HTableDescriptor desc = new HTableDescriptor(tableName);
593      for (ColumnDescriptor col : columnFamilies) {
594        HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
595        desc.addFamily(colDesc);
596      }
597      getAdmin().createTable(desc);
598    } catch (IOException e) {
599      LOG.warn(e.getMessage(), e);
600      throw getIOError(e);
601    } catch (IllegalArgumentException e) {
602      LOG.warn(e.getMessage(), e);
603      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
604    }
605  }
606
607  private static TableName getTableName(ByteBuffer buffer) {
608    return TableName.valueOf(getBytes(buffer));
609  }
610
611  @Override
612  public void deleteTable(ByteBuffer in_tableName) throws IOError {
613    TableName tableName = getTableName(in_tableName);
614    if (LOG.isDebugEnabled()) {
615      LOG.debug("deleteTable: table={}", tableName);
616    }
617    try {
618      if (!getAdmin().tableExists(tableName)) {
619        throw new IOException("table does not exist");
620      }
621      getAdmin().deleteTable(tableName);
622    } catch (IOException e) {
623      LOG.warn(e.getMessage(), e);
624      throw getIOError(e);
625    }
626  }
627
628  @Override
629  public void mutateRow(ByteBuffer tableName, ByteBuffer row, List<Mutation> mutations,
630    Map<ByteBuffer, ByteBuffer> attributes) throws IOError, IllegalArgument {
631    mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP, attributes);
632  }
633
634  @Override
635  public void mutateRowTs(ByteBuffer tableName, ByteBuffer row, List<Mutation> mutations,
636    long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, IllegalArgument {
637    Table table = null;
638    try {
639      table = getTable(tableName);
640      Put put = new Put(getBytes(row), timestamp);
641      addAttributes(put, attributes);
642
643      Delete delete = new Delete(getBytes(row));
644      addAttributes(delete, attributes);
645      if (metrics != null) {
646        metrics.incNumRowKeysInBatchMutate(mutations.size());
647      }
648
649      // I apologize for all this mess :)
650      CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
651      for (Mutation m : mutations) {
652        byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
653        if (m.isDelete) {
654          if (famAndQf.length == 1) {
655            delete.addFamily(famAndQf[0], timestamp);
656          } else {
657            delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
658          }
659          delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
660        } else {
661          if (famAndQf.length == 1) {
662            LOG.warn("No column qualifier specified. Delete is the only mutation supported "
663              + "over the whole column family.");
664          } else {
665            put.add(builder.clear().setRow(put.getRow()).setFamily(famAndQf[0])
666              .setQualifier(famAndQf[1]).setTimestamp(put.getTimestamp()).setType(Cell.Type.Put)
667              .setValue(m.value != null ? getBytes(m.value) : HConstants.EMPTY_BYTE_ARRAY).build());
668          }
669          put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
670        }
671      }
672      if (!delete.isEmpty()) {
673        table.delete(delete);
674      }
675      if (!put.isEmpty()) {
676        table.put(put);
677      }
678    } catch (IOException e) {
679      LOG.warn(e.getMessage(), e);
680      throw getIOError(e);
681    } catch (IllegalArgumentException e) {
682      LOG.warn(e.getMessage(), e);
683      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
684    } finally {
685      closeTable(table);
686    }
687  }
688
689  @Override
690  public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
691    Map<ByteBuffer, ByteBuffer> attributes) throws IOError, IllegalArgument, TException {
692    mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
693  }
694
695  @Override
696  public void mutateRowsTs(ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
697    Map<ByteBuffer, ByteBuffer> attributes) throws IOError, IllegalArgument, TException {
698    List<Put> puts = new ArrayList<>();
699    List<Delete> deletes = new ArrayList<>();
700    CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
701    for (BatchMutation batch : rowBatches) {
702      byte[] row = getBytes(batch.row);
703      List<Mutation> mutations = batch.mutations;
704      Delete delete = new Delete(row);
705      addAttributes(delete, attributes);
706      Put put = new Put(row, timestamp);
707      addAttributes(put, attributes);
708      for (Mutation m : mutations) {
709        byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
710        if (m.isDelete) {
711          // no qualifier, family only.
712          if (famAndQf.length == 1) {
713            delete.addFamily(famAndQf[0], timestamp);
714          } else {
715            delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
716          }
717          delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
718        } else {
719          if (famAndQf.length == 1) {
720            LOG.warn("No column qualifier specified. Delete is the only mutation supported "
721              + "over the whole column family.");
722          }
723          if (famAndQf.length == 2) {
724            try {
725              put.add(builder.clear().setRow(put.getRow()).setFamily(famAndQf[0])
726                .setQualifier(famAndQf[1]).setTimestamp(put.getTimestamp()).setType(Cell.Type.Put)
727                .setValue(m.value != null ? getBytes(m.value) : HConstants.EMPTY_BYTE_ARRAY)
728                .build());
729            } catch (IOException e) {
730              throw new IllegalArgumentException(e);
731            }
732          } else {
733            throw new IllegalArgumentException("Invalid famAndQf provided.");
734          }
735          put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
736        }
737      }
738      if (!delete.isEmpty()) {
739        deletes.add(delete);
740      }
741      if (!put.isEmpty()) {
742        puts.add(put);
743      }
744    }
745
746    Table table = null;
747    try {
748      table = getTable(tableName);
749      if (!puts.isEmpty()) {
750        table.put(puts);
751      }
752      if (!deletes.isEmpty()) {
753        table.delete(deletes);
754      }
755    } catch (IOException e) {
756      LOG.warn(e.getMessage(), e);
757      throw getIOError(e);
758    } catch (IllegalArgumentException e) {
759      LOG.warn(e.getMessage(), e);
760      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
761    } finally {
762      closeTable(table);
763    }
764  }
765
766  @Override
767  public long atomicIncrement(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
768    throws IOError, IllegalArgument, TException {
769    byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
770    if (famAndQf.length == 1) {
771      return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
772    }
773    return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
774  }
775
776  protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row, byte[] family,
777    byte[] qualifier, long amount) throws IOError, IllegalArgument, TException {
778    Table table = null;
779    try {
780      table = getTable(tableName);
781      return table.incrementColumnValue(getBytes(row), family, qualifier, amount);
782    } catch (IOException e) {
783      LOG.warn(e.getMessage(), e);
784      throw getIOError(e);
785    } finally {
786      closeTable(table);
787    }
788  }
789
790  @Override
791  public void scannerClose(int id) throws IOError, IllegalArgument {
792    LOG.debug("scannerClose: id={}", id);
793    ResultScannerWrapper resultScannerWrapper = getScanner(id);
794    if (resultScannerWrapper == null) {
795      LOG.warn("scanner ID is invalid");
796      throw new IllegalArgument("scanner ID is invalid");
797    }
798    resultScannerWrapper.getScanner().close();
799    removeScanner(id);
800  }
801
802  @Override
803  public List<TRowResult> scannerGetList(int id, int nbRows) throws IllegalArgument, IOError {
804    LOG.debug("scannerGetList: id={}", id);
805    ResultScannerWrapper resultScannerWrapper = getScanner(id);
806    if (null == resultScannerWrapper) {
807      String message = "scanner ID is invalid";
808      LOG.warn(message);
809      throw new IllegalArgument("scanner ID is invalid");
810    }
811
812    Result[] results;
813    try {
814      results = resultScannerWrapper.getScanner().next(nbRows);
815      if (null == results) {
816        return new ArrayList<>();
817      }
818    } catch (IOException e) {
819      LOG.warn(e.getMessage(), e);
820      throw getIOError(e);
821    } finally {
822      // Add scanner back to scannerMap; protects against case
823      // where scanner expired during processing of request.
824      scannerMap.put(id, resultScannerWrapper);
825    }
826    return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
827  }
828
829  @Override
830  public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
831    return scannerGetList(id, 1);
832  }
833
834  @Override
835  public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
836    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
837
838    Table table = null;
839    try {
840      table = getTable(tableName);
841      Scan scan = new Scan();
842      addAttributes(scan, attributes);
843      if (tScan.isSetStartRow()) {
844        scan.setStartRow(tScan.getStartRow());
845      }
846      if (tScan.isSetStopRow()) {
847        scan.setStopRow(tScan.getStopRow());
848      }
849      if (tScan.isSetTimestamp()) {
850        scan.setTimeRange(0, tScan.getTimestamp());
851      }
852      if (tScan.isSetCaching()) {
853        scan.setCaching(tScan.getCaching());
854      }
855      if (tScan.isSetBatchSize()) {
856        scan.setBatch(tScan.getBatchSize());
857      }
858      if (tScan.isSetColumns() && !tScan.getColumns().isEmpty()) {
859        for (ByteBuffer column : tScan.getColumns()) {
860          byte[][] famQf = CellUtil.parseColumn(getBytes(column));
861          if (famQf.length == 1) {
862            scan.addFamily(famQf[0]);
863          } else {
864            scan.addColumn(famQf[0], famQf[1]);
865          }
866        }
867      }
868      if (tScan.isSetFilterString()) {
869        ParseFilter parseFilter = new ParseFilter();
870        scan.setFilter(parseFilter.parseFilterString(tScan.getFilterString()));
871      }
872      if (tScan.isSetReversed()) {
873        scan.setReversed(tScan.isReversed());
874      }
875      if (tScan.isSetCacheBlocks()) {
876        scan.setCacheBlocks(tScan.isCacheBlocks());
877      }
878      return addScanner(table.getScanner(scan), tScan.sortColumns);
879    } catch (IOException e) {
880      LOG.warn(e.getMessage(), e);
881      throw getIOError(e);
882    } finally {
883      closeTable(table);
884    }
885  }
886
887  @Override
888  public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow, List<ByteBuffer> columns,
889    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
890
891    Table table = null;
892    try {
893      table = getTable(tableName);
894      Scan scan = new Scan(getBytes(startRow));
895      addAttributes(scan, attributes);
896      if (columns != null && !columns.isEmpty()) {
897        for (ByteBuffer column : columns) {
898          byte[][] famQf = CellUtil.parseColumn(getBytes(column));
899          if (famQf.length == 1) {
900            scan.addFamily(famQf[0]);
901          } else {
902            scan.addColumn(famQf[0], famQf[1]);
903          }
904        }
905      }
906      return addScanner(table.getScanner(scan), false);
907    } catch (IOException e) {
908      LOG.warn(e.getMessage(), e);
909      throw getIOError(e);
910    } finally {
911      closeTable(table);
912    }
913  }
914
915  @Override
916  public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow, ByteBuffer stopRow,
917    List<ByteBuffer> columns, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
918
919    Table table = null;
920    try {
921      table = getTable(tableName);
922      Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
923      addAttributes(scan, attributes);
924      if (columns != null && !columns.isEmpty()) {
925        for (ByteBuffer column : columns) {
926          byte[][] famQf = CellUtil.parseColumn(getBytes(column));
927          if (famQf.length == 1) {
928            scan.addFamily(famQf[0]);
929          } else {
930            scan.addColumn(famQf[0], famQf[1]);
931          }
932        }
933      }
934      return addScanner(table.getScanner(scan), false);
935    } catch (IOException e) {
936      LOG.warn(e.getMessage(), e);
937      throw getIOError(e);
938    } finally {
939      closeTable(table);
940    }
941  }
942
943  @Override
944  public int scannerOpenWithPrefix(ByteBuffer tableName, ByteBuffer startAndPrefix,
945    List<ByteBuffer> columns, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
946
947    Table table = null;
948    try {
949      table = getTable(tableName);
950      Scan scan = new Scan(getBytes(startAndPrefix));
951      addAttributes(scan, attributes);
952      Filter f = new WhileMatchFilter(new PrefixFilter(getBytes(startAndPrefix)));
953      scan.setFilter(f);
954      if (columns != null && !columns.isEmpty()) {
955        for (ByteBuffer column : columns) {
956          byte[][] famQf = CellUtil.parseColumn(getBytes(column));
957          if (famQf.length == 1) {
958            scan.addFamily(famQf[0]);
959          } else {
960            scan.addColumn(famQf[0], famQf[1]);
961          }
962        }
963      }
964      return addScanner(table.getScanner(scan), false);
965    } catch (IOException e) {
966      LOG.warn(e.getMessage(), e);
967      throw getIOError(e);
968    } finally {
969      closeTable(table);
970    }
971  }
972
973  @Override
974  public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow, List<ByteBuffer> columns,
975    long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
976
977    Table table = null;
978    try {
979      table = getTable(tableName);
980      Scan scan = new Scan(getBytes(startRow));
981      addAttributes(scan, attributes);
982      scan.setTimeRange(0, timestamp);
983      if (columns != null && !columns.isEmpty()) {
984        for (ByteBuffer column : columns) {
985          byte[][] famQf = CellUtil.parseColumn(getBytes(column));
986          if (famQf.length == 1) {
987            scan.addFamily(famQf[0]);
988          } else {
989            scan.addColumn(famQf[0], famQf[1]);
990          }
991        }
992      }
993      return addScanner(table.getScanner(scan), false);
994    } catch (IOException e) {
995      LOG.warn(e.getMessage(), e);
996      throw getIOError(e);
997    } finally {
998      closeTable(table);
999    }
1000  }
1001
1002  @Override
1003  public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow, ByteBuffer stopRow,
1004    List<ByteBuffer> columns, long timestamp, Map<ByteBuffer, ByteBuffer> attributes)
1005    throws IOError, TException {
1006
1007    Table table = null;
1008    try {
1009      table = getTable(tableName);
1010      Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1011      addAttributes(scan, attributes);
1012      scan.setTimeRange(0, timestamp);
1013      if (columns != null && !columns.isEmpty()) {
1014        for (ByteBuffer column : columns) {
1015          byte[][] famQf = CellUtil.parseColumn(getBytes(column));
1016          if (famQf.length == 1) {
1017            scan.addFamily(famQf[0]);
1018          } else {
1019            scan.addColumn(famQf[0], famQf[1]);
1020          }
1021        }
1022      }
1023      scan.setTimeRange(0, timestamp);
1024      return addScanner(table.getScanner(scan), false);
1025    } catch (IOException e) {
1026      LOG.warn(e.getMessage(), e);
1027      throw getIOError(e);
1028    } finally {
1029      closeTable(table);
1030    }
1031  }
1032
1033  @Override
1034  public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(ByteBuffer tableName)
1035    throws IOError, TException {
1036
1037    Table table = null;
1038    try {
1039      TreeMap<ByteBuffer, ColumnDescriptor> columns = new TreeMap<>();
1040
1041      table = getTable(tableName);
1042      HTableDescriptor desc = table.getTableDescriptor();
1043
1044      for (HColumnDescriptor e : desc.getFamilies()) {
1045        ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1046        columns.put(col.name, col);
1047      }
1048      return columns;
1049    } catch (IOException e) {
1050      LOG.warn(e.getMessage(), e);
1051      throw getIOError(e);
1052    } finally {
1053      closeTable(table);
1054    }
1055  }
1056
1057  private void closeTable(Table table) throws IOError {
1058    try {
1059      if (table != null) {
1060        table.close();
1061      }
1062    } catch (IOException e) {
1063      LOG.error(e.getMessage(), e);
1064      throw getIOError(e);
1065    }
1066  }
1067
1068  @Override
1069  public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1070    try {
1071      byte[] row = getBytes(searchRow);
1072      Result startRowResult =
1073        getReverseScanResult(TableName.META_TABLE_NAME.getName(), row, HConstants.CATALOG_FAMILY);
1074
1075      if (startRowResult == null) {
1076        throw new IOException(
1077          "Cannot find row in " + TableName.META_TABLE_NAME + ", row=" + Bytes.toStringBinary(row));
1078      }
1079
1080      // find region start and end keys
1081      RegionInfo regionInfo = MetaTableAccessor.getRegionInfo(startRowResult);
1082      if (regionInfo == null) {
1083        throw new IOException("RegionInfo REGIONINFO was null or " + " empty in Meta for row="
1084          + Bytes.toStringBinary(row));
1085      }
1086      TRegionInfo region = new TRegionInfo();
1087      region.setStartKey(regionInfo.getStartKey());
1088      region.setEndKey(regionInfo.getEndKey());
1089      region.id = regionInfo.getRegionId();
1090      region.setName(regionInfo.getRegionName());
1091      region.version = HREGION_VERSION; // version not used anymore, PB encoding used.
1092
1093      // find region assignment to server
1094      ServerName serverName = MetaTableAccessor.getServerName(startRowResult, 0);
1095      if (serverName != null) {
1096        region.setServerName(Bytes.toBytes(serverName.getHostname()));
1097        region.port = serverName.getPort();
1098      }
1099      return region;
1100    } catch (IOException e) {
1101      LOG.warn(e.getMessage(), e);
1102      throw getIOError(e);
1103    }
1104  }
1105
1106  private Result getReverseScanResult(byte[] tableName, byte[] row, byte[] family)
1107    throws IOException {
1108    Scan scan = new Scan(row);
1109    scan.setReversed(true);
1110    scan.addFamily(family);
1111    scan.setStartRow(row);
1112    try (Table table = getTable(tableName); ResultScanner scanner = table.getScanner(scan)) {
1113      return scanner.next();
1114    }
1115  }
1116
1117  @Override
1118  public void increment(TIncrement tincrement) throws IOError, TException {
1119
1120    if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1121      throw new TException("Must supply a table and a row key; can't increment");
1122    }
1123
1124    if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1125      this.coalescer.queueIncrement(tincrement);
1126      return;
1127    }
1128
1129    Table table = null;
1130    try {
1131      table = getTable(tincrement.getTable());
1132      Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1133      table.increment(inc);
1134    } catch (IOException e) {
1135      LOG.warn(e.getMessage(), e);
1136      throw getIOError(e);
1137    } finally {
1138      closeTable(table);
1139    }
1140  }
1141
1142  @Override
1143  public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1144    if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1145      this.coalescer.queueIncrements(tincrements);
1146      return;
1147    }
1148    for (TIncrement tinc : tincrements) {
1149      increment(tinc);
1150    }
1151  }
1152
1153  @Override
1154  public List<TCell> append(TAppend tappend) throws IOError, TException {
1155    if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1156      throw new TException("Must supply a table and a row key; can't append");
1157    }
1158
1159    Table table = null;
1160    try {
1161      table = getTable(tappend.getTable());
1162      Append append = ThriftUtilities.appendFromThrift(tappend);
1163      Result result = table.append(append);
1164      return ThriftUtilities.cellFromHBase(result.rawCells());
1165    } catch (IOException e) {
1166      LOG.warn(e.getMessage(), e);
1167      throw getIOError(e);
1168    } finally {
1169      closeTable(table);
1170    }
1171  }
1172
1173  @Override
1174  public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1175    ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes)
1176    throws IOError, IllegalArgument, TException {
1177    Put put;
1178    try {
1179      put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1180      addAttributes(put, attributes);
1181
1182      byte[][] famAndQf = CellUtil.parseColumn(getBytes(mput.column));
1183      put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
1184        .setFamily(famAndQf[0]).setQualifier(famAndQf[1]).setTimestamp(put.getTimestamp())
1185        .setType(Cell.Type.Put)
1186        .setValue(mput.value != null ? getBytes(mput.value) : HConstants.EMPTY_BYTE_ARRAY).build());
1187      put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1188    } catch (IOException | IllegalArgumentException e) {
1189      LOG.warn(e.getMessage(), e);
1190      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1191    }
1192
1193    Table table = null;
1194    try {
1195      table = getTable(tableName);
1196      byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
1197      Table.CheckAndMutateBuilder mutateBuilder =
1198        table.checkAndMutate(getBytes(row), famAndQf[0]).qualifier(famAndQf[1]);
1199      if (value != null) {
1200        return mutateBuilder.ifEquals(getBytes(value)).thenPut(put);
1201      } else {
1202        return mutateBuilder.ifNotExists().thenPut(put);
1203      }
1204    } catch (IOException e) {
1205      LOG.warn(e.getMessage(), e);
1206      throw getIOError(e);
1207    } catch (IllegalArgumentException e) {
1208      LOG.warn(e.getMessage(), e);
1209      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1210    } finally {
1211      closeTable(table);
1212    }
1213  }
1214
1215  @Override
1216  public TThriftServerType getThriftServerType() {
1217    return TThriftServerType.ONE;
1218  }
1219
1220  @Override
1221  public String getClusterId() throws TException {
1222    return connectionCache.getClusterId();
1223  }
1224
1225  private static IOError getIOError(Throwable throwable) {
1226    IOError error = new IOErrorWithCause(throwable);
1227    error.setCanRetry(!(throwable instanceof DoNotRetryIOException));
1228    error.setMessage(Throwables.getStackTraceAsString(throwable));
1229    return error;
1230  }
1231
1232  /**
1233   * Adds all the attributes into the Operation object
1234   */
1235  private static void addAttributes(OperationWithAttributes op,
1236    Map<ByteBuffer, ByteBuffer> attributes) {
1237    if (attributes == null || attributes.isEmpty()) {
1238      return;
1239    }
1240    for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1241      String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1242      byte[] value = getBytes(entry.getValue());
1243      op.setAttribute(name, value);
1244    }
1245  }
1246
1247  protected static class ResultScannerWrapper {
1248
1249    private final ResultScanner scanner;
1250    private final boolean sortColumns;
1251
1252    public ResultScannerWrapper(ResultScanner resultScanner, boolean sortResultColumns) {
1253      scanner = resultScanner;
1254      sortColumns = sortResultColumns;
1255    }
1256
1257    public ResultScanner getScanner() {
1258      return scanner;
1259    }
1260
1261    public boolean isColumnSorted() {
1262      return sortColumns;
1263    }
1264  }
1265
1266  public static class IOErrorWithCause extends IOError {
1267    private final Throwable cause;
1268
1269    public IOErrorWithCause(Throwable cause) {
1270      this.cause = cause;
1271    }
1272
1273    @Override
1274    public synchronized Throwable getCause() {
1275      return cause;
1276    }
1277
1278    @Override
1279    public boolean equals(Object other) {
1280      if (super.equals(other) && other instanceof IOErrorWithCause) {
1281        Throwable otherCause = ((IOErrorWithCause) other).getCause();
1282        if (this.getCause() != null) {
1283          return otherCause != null && this.getCause().equals(otherCause);
1284        } else {
1285          return otherCause == null;
1286        }
1287      }
1288      return false;
1289    }
1290
1291    @Override
1292    public int hashCode() {
1293      int result = super.hashCode();
1294      result = 31 * result + (cause != null ? cause.hashCode() : 0);
1295      return result;
1296    }
1297  }
1298
1299}