001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.thrift; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; 021import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; 022import static org.apache.hadoop.hbase.thrift.Constants.COALESCE_INC_KEY; 023import static org.apache.hadoop.hbase.util.Bytes.getBytes; 024 025import java.io.IOException; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.TreeMap; 033import java.util.concurrent.TimeUnit; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.CellBuilder; 037import org.apache.hadoop.hbase.CellBuilderFactory; 038import org.apache.hadoop.hbase.CellBuilderType; 039import org.apache.hadoop.hbase.CellUtil; 040import org.apache.hadoop.hbase.DoNotRetryIOException; 041import org.apache.hadoop.hbase.HColumnDescriptor; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.HRegionLocation; 044import org.apache.hadoop.hbase.HTableDescriptor; 045import org.apache.hadoop.hbase.KeyValue; 046import org.apache.hadoop.hbase.MetaTableAccessor; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.TableNotFoundException; 050import org.apache.hadoop.hbase.client.Append; 051import org.apache.hadoop.hbase.client.Delete; 052import org.apache.hadoop.hbase.client.Durability; 053import org.apache.hadoop.hbase.client.Get; 054import org.apache.hadoop.hbase.client.Increment; 055import org.apache.hadoop.hbase.client.OperationWithAttributes; 056import org.apache.hadoop.hbase.client.Put; 057import org.apache.hadoop.hbase.client.RegionInfo; 058import org.apache.hadoop.hbase.client.RegionLocator; 059import org.apache.hadoop.hbase.client.Result; 060import org.apache.hadoop.hbase.client.ResultScanner; 061import org.apache.hadoop.hbase.client.Scan; 062import org.apache.hadoop.hbase.client.Table; 063import org.apache.hadoop.hbase.filter.Filter; 064import org.apache.hadoop.hbase.filter.ParseFilter; 065import org.apache.hadoop.hbase.filter.PrefixFilter; 066import org.apache.hadoop.hbase.filter.WhileMatchFilter; 067import org.apache.hadoop.hbase.security.UserProvider; 068import org.apache.hadoop.hbase.thrift.generated.AlreadyExists; 069import org.apache.hadoop.hbase.thrift.generated.BatchMutation; 070import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; 071import org.apache.hadoop.hbase.thrift.generated.Hbase; 072import org.apache.hadoop.hbase.thrift.generated.IOError; 073import org.apache.hadoop.hbase.thrift.generated.IllegalArgument; 074import org.apache.hadoop.hbase.thrift.generated.Mutation; 075import org.apache.hadoop.hbase.thrift.generated.TAppend; 076import org.apache.hadoop.hbase.thrift.generated.TCell; 077import org.apache.hadoop.hbase.thrift.generated.TIncrement; 078import org.apache.hadoop.hbase.thrift.generated.TRegionInfo; 079import org.apache.hadoop.hbase.thrift.generated.TRowResult; 080import org.apache.hadoop.hbase.thrift.generated.TScan; 081import org.apache.hadoop.hbase.thrift.generated.TThriftServerType; 082import org.apache.hadoop.hbase.util.Bytes; 083import org.apache.thrift.TException; 084import org.apache.yetus.audience.InterfaceAudience; 085import org.slf4j.Logger; 086import org.slf4j.LoggerFactory; 087 088import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 089import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 090import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 091 092/** 093 * The HBaseServiceHandler is a glue object that connects Thrift RPC calls to the HBase client API 094 * primarily defined in the Admin and Table objects. 095 */ 096@InterfaceAudience.Private 097@SuppressWarnings("deprecation") 098public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements Hbase.Iface { 099 private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class); 100 101 public static final int HREGION_VERSION = 1; 102 103 // nextScannerId and scannerMap are used to manage scanner state 104 private int nextScannerId = 0; 105 private Cache<Integer, ResultScannerWrapper> scannerMap; 106 IncrementCoalescer coalescer; 107 108 /** 109 * Returns a list of all the column families for a given Table. 110 * @param table table 111 */ 112 byte[][] getAllColumns(Table table) throws IOException { 113 HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies(); 114 byte[][] columns = new byte[cds.length][]; 115 for (int i = 0; i < cds.length; i++) { 116 columns[i] = Bytes.add(cds[i].getName(), KeyValue.COLUMN_FAMILY_DELIM_ARRAY); 117 } 118 return columns; 119 } 120 121 /** 122 * Assigns a unique ID to the scanner and adds the mapping to an internal hash-map. 123 * @param scanner the {@link ResultScanner} to add 124 * @return integer scanner id 125 */ 126 protected synchronized int addScanner(ResultScanner scanner, boolean sortColumns) { 127 int id = nextScannerId++; 128 ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns); 129 scannerMap.put(id, resultScannerWrapper); 130 return id; 131 } 132 133 /** 134 * Returns the scanner associated with the specified ID. 135 * @param id the ID of the scanner to get 136 * @return a Scanner, or null if ID was invalid. 137 */ 138 private synchronized ResultScannerWrapper getScanner(int id) { 139 return scannerMap.getIfPresent(id); 140 } 141 142 /** 143 * Removes the scanner associated with the specified ID from the internal id->scanner hash-map. 144 * @param id the ID of the scanner to remove 145 */ 146 private synchronized void removeScanner(int id) { 147 scannerMap.invalidate(id); 148 } 149 150 protected ThriftHBaseServiceHandler(final Configuration c, final UserProvider userProvider) 151 throws IOException { 152 super(c, userProvider); 153 long cacheTimeout = 154 c.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD) 155 * 2; 156 157 scannerMap = 158 CacheBuilder.newBuilder().expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS).build(); 159 160 this.coalescer = new IncrementCoalescer(this); 161 } 162 163 @Override 164 public void enableTable(ByteBuffer tableName) throws IOError { 165 try { 166 getAdmin().enableTable(getTableName(tableName)); 167 } catch (IOException e) { 168 LOG.warn(e.getMessage(), e); 169 throw getIOError(e); 170 } 171 } 172 173 @Override 174 public void disableTable(ByteBuffer tableName) throws IOError { 175 try { 176 getAdmin().disableTable(getTableName(tableName)); 177 } catch (IOException e) { 178 LOG.warn(e.getMessage(), e); 179 throw getIOError(e); 180 } 181 } 182 183 @Override 184 public boolean isTableEnabled(ByteBuffer tableName) throws IOError { 185 try { 186 return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName)); 187 } catch (IOException e) { 188 LOG.warn(e.getMessage(), e); 189 throw getIOError(e); 190 } 191 } 192 193 @Override 194 public Map<ByteBuffer, Boolean> getTableNamesWithIsTableEnabled() throws IOError { 195 try { 196 HashMap<ByteBuffer, Boolean> tables = new HashMap<>(); 197 for (ByteBuffer tableName : this.getTableNames()) { 198 tables.put(tableName, this.isTableEnabled(tableName)); 199 } 200 return tables; 201 } catch (IOError e) { 202 LOG.warn(e.getMessage(), e); 203 throw getIOError(e); 204 } 205 } 206 207 // ThriftServerRunner.compact should be deprecated and replaced with methods specific to 208 // table and region. 209 @Override 210 public void compact(ByteBuffer tableNameOrRegionName) throws IOError { 211 try { 212 try { 213 getAdmin().compactRegion(getBytes(tableNameOrRegionName)); 214 } catch (IllegalArgumentException e) { 215 // Invalid region, try table 216 getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName))); 217 } 218 } catch (IOException e) { 219 LOG.warn(e.getMessage(), e); 220 throw getIOError(e); 221 } 222 } 223 224 // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific 225 // to table and region. 226 @Override 227 public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError { 228 try { 229 try { 230 getAdmin().compactRegion(getBytes(tableNameOrRegionName)); 231 } catch (IllegalArgumentException e) { 232 // Invalid region, try table 233 getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName))); 234 } 235 } catch (IOException e) { 236 LOG.warn(e.getMessage(), e); 237 throw getIOError(e); 238 } 239 } 240 241 @Override 242 public List<ByteBuffer> getTableNames() throws IOError { 243 try { 244 TableName[] tableNames = this.getAdmin().listTableNames(); 245 ArrayList<ByteBuffer> list = new ArrayList<>(tableNames.length); 246 for (TableName tableName : tableNames) { 247 list.add(ByteBuffer.wrap(tableName.getName())); 248 } 249 return list; 250 } catch (IOException e) { 251 LOG.warn(e.getMessage(), e); 252 throw getIOError(e); 253 } 254 } 255 256 /** 257 * Returns the list of regions in the given table, or an empty list if the table does not exist 258 */ 259 @Override 260 public List<TRegionInfo> getTableRegions(ByteBuffer tableName) throws IOError { 261 try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) { 262 List<HRegionLocation> regionLocations = locator.getAllRegionLocations(); 263 List<TRegionInfo> results = new ArrayList<>(regionLocations.size()); 264 for (HRegionLocation regionLocation : regionLocations) { 265 RegionInfo info = regionLocation.getRegionInfo(); 266 ServerName serverName = regionLocation.getServerName(); 267 TRegionInfo region = new TRegionInfo(); 268 region.serverName = ByteBuffer.wrap(Bytes.toBytes(serverName.getHostname())); 269 region.port = serverName.getPort(); 270 region.startKey = ByteBuffer.wrap(info.getStartKey()); 271 region.endKey = ByteBuffer.wrap(info.getEndKey()); 272 region.id = info.getRegionId(); 273 region.name = ByteBuffer.wrap(info.getRegionName()); 274 region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used 275 results.add(region); 276 } 277 return results; 278 } catch (TableNotFoundException e) { 279 // Return empty list for non-existing table 280 return Collections.emptyList(); 281 } catch (IOException e) { 282 LOG.warn(e.getMessage(), e); 283 throw getIOError(e); 284 } 285 } 286 287 @Override 288 public List<TCell> get(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, 289 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 290 byte[][] famAndQf = CellUtil.parseColumn(getBytes(column)); 291 if (famAndQf.length == 1) { 292 return get(tableName, row, famAndQf[0], null, attributes); 293 } 294 if (famAndQf.length == 2) { 295 return get(tableName, row, famAndQf[0], famAndQf[1], attributes); 296 } 297 throw new IllegalArgumentException("Invalid familyAndQualifier provided."); 298 } 299 300 /** 301 * Note: this internal interface is slightly different from public APIs in regard to handling of 302 * the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather, we 303 * respect qual == null as a request for the entire column family. The caller ( 304 * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the 305 * column is parse like normal. 306 */ 307 protected List<TCell> get(ByteBuffer tableName, ByteBuffer row, byte[] family, byte[] qualifier, 308 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 309 Table table = null; 310 try { 311 table = getTable(tableName); 312 Get get = new Get(getBytes(row)); 313 addAttributes(get, attributes); 314 if (qualifier == null) { 315 get.addFamily(family); 316 } else { 317 get.addColumn(family, qualifier); 318 } 319 Result result = table.get(get); 320 return ThriftUtilities.cellFromHBase(result.rawCells()); 321 } catch (IOException e) { 322 LOG.warn(e.getMessage(), e); 323 throw getIOError(e); 324 } finally { 325 closeTable(table); 326 } 327 } 328 329 @Override 330 public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, 331 int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 332 byte[][] famAndQf = CellUtil.parseColumn(getBytes(column)); 333 if (famAndQf.length == 1) { 334 return getVer(tableName, row, famAndQf[0], null, numVersions, attributes); 335 } 336 if (famAndQf.length == 2) { 337 return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes); 338 } 339 throw new IllegalArgumentException("Invalid familyAndQualifier provided."); 340 341 } 342 343 /** 344 * Note: this public interface is slightly different from public Java APIs in regard to handling 345 * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather, we 346 * respect qual == null as a request for the entire column family. If you want to access the 347 * entire column family, use {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a 348 * {@code column} value that lacks a {@code ':'}. 349 */ 350 public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family, byte[] qualifier, 351 int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 352 353 Table table = null; 354 try { 355 table = getTable(tableName); 356 Get get = new Get(getBytes(row)); 357 addAttributes(get, attributes); 358 if (null == qualifier) { 359 get.addFamily(family); 360 } else { 361 get.addColumn(family, qualifier); 362 } 363 get.setMaxVersions(numVersions); 364 Result result = table.get(get); 365 return ThriftUtilities.cellFromHBase(result.rawCells()); 366 } catch (IOException e) { 367 LOG.warn(e.getMessage(), e); 368 throw getIOError(e); 369 } finally { 370 closeTable(table); 371 } 372 } 373 374 @Override 375 public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, 376 long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 377 byte[][] famAndQf = CellUtil.parseColumn(getBytes(column)); 378 if (famAndQf.length == 1) { 379 return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes); 380 } 381 if (famAndQf.length == 2) { 382 return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions, attributes); 383 } 384 throw new IllegalArgumentException("Invalid familyAndQualifier provided."); 385 } 386 387 /** 388 * Note: this internal interface is slightly different from public APIs in regard to handling of 389 * the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather, we 390 * respect qual == null as a request for the entire column family. The caller ( 391 * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS consistent 392 * in that the column is parse like normal. 393 */ 394 protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family, 395 byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) 396 throws IOError { 397 398 Table table = null; 399 try { 400 table = getTable(tableName); 401 Get get = new Get(getBytes(row)); 402 addAttributes(get, attributes); 403 if (null == qualifier) { 404 get.addFamily(family); 405 } else { 406 get.addColumn(family, qualifier); 407 } 408 get.setTimeRange(0, timestamp); 409 get.setMaxVersions(numVersions); 410 Result result = table.get(get); 411 return ThriftUtilities.cellFromHBase(result.rawCells()); 412 } catch (IOException e) { 413 LOG.warn(e.getMessage(), e); 414 throw getIOError(e); 415 } finally { 416 closeTable(table); 417 } 418 } 419 420 @Override 421 public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row, 422 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 423 return getRowWithColumnsTs(tableName, row, null, HConstants.LATEST_TIMESTAMP, attributes); 424 } 425 426 @Override 427 public List<TRowResult> getRowWithColumns(ByteBuffer tableName, ByteBuffer row, 428 List<ByteBuffer> columns, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 429 return getRowWithColumnsTs(tableName, row, columns, HConstants.LATEST_TIMESTAMP, attributes); 430 } 431 432 @Override 433 public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp, 434 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 435 return getRowWithColumnsTs(tableName, row, null, timestamp, attributes); 436 } 437 438 @Override 439 public List<TRowResult> getRowWithColumnsTs(ByteBuffer tableName, ByteBuffer row, 440 List<ByteBuffer> columns, long timestamp, Map<ByteBuffer, ByteBuffer> attributes) 441 throws IOError { 442 443 Table table = null; 444 try { 445 table = getTable(tableName); 446 if (columns == null) { 447 Get get = new Get(getBytes(row)); 448 addAttributes(get, attributes); 449 get.setTimeRange(0, timestamp); 450 Result result = table.get(get); 451 return ThriftUtilities.rowResultFromHBase(result); 452 } 453 Get get = new Get(getBytes(row)); 454 addAttributes(get, attributes); 455 for (ByteBuffer column : columns) { 456 byte[][] famAndQf = CellUtil.parseColumn(getBytes(column)); 457 if (famAndQf.length == 1) { 458 get.addFamily(famAndQf[0]); 459 } else { 460 get.addColumn(famAndQf[0], famAndQf[1]); 461 } 462 } 463 get.setTimeRange(0, timestamp); 464 Result result = table.get(get); 465 return ThriftUtilities.rowResultFromHBase(result); 466 } catch (IOException e) { 467 LOG.warn(e.getMessage(), e); 468 throw getIOError(e); 469 } finally { 470 closeTable(table); 471 } 472 } 473 474 @Override 475 public List<TRowResult> getRows(ByteBuffer tableName, List<ByteBuffer> rows, 476 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 477 return getRowsWithColumnsTs(tableName, rows, null, HConstants.LATEST_TIMESTAMP, attributes); 478 } 479 480 @Override 481 public List<TRowResult> getRowsWithColumns(ByteBuffer tableName, List<ByteBuffer> rows, 482 List<ByteBuffer> columns, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 483 return getRowsWithColumnsTs(tableName, rows, columns, HConstants.LATEST_TIMESTAMP, attributes); 484 } 485 486 @Override 487 public List<TRowResult> getRowsTs(ByteBuffer tableName, List<ByteBuffer> rows, long timestamp, 488 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 489 return getRowsWithColumnsTs(tableName, rows, null, timestamp, attributes); 490 } 491 492 @Override 493 public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName, List<ByteBuffer> rows, 494 List<ByteBuffer> columns, long timestamp, Map<ByteBuffer, ByteBuffer> attributes) 495 throws IOError { 496 497 Table table = null; 498 try { 499 List<Get> gets = new ArrayList<>(rows.size()); 500 table = getTable(tableName); 501 if (metrics != null) { 502 metrics.incNumRowKeysInBatchGet(rows.size()); 503 } 504 for (ByteBuffer row : rows) { 505 Get get = new Get(getBytes(row)); 506 addAttributes(get, attributes); 507 if (columns != null) { 508 509 for (ByteBuffer column : columns) { 510 byte[][] famAndQf = CellUtil.parseColumn(getBytes(column)); 511 if (famAndQf.length == 1) { 512 get.addFamily(famAndQf[0]); 513 } else { 514 get.addColumn(famAndQf[0], famAndQf[1]); 515 } 516 } 517 } 518 get.setTimeRange(0, timestamp); 519 gets.add(get); 520 } 521 Result[] result = table.get(gets); 522 return ThriftUtilities.rowResultFromHBase(result); 523 } catch (IOException e) { 524 LOG.warn(e.getMessage(), e); 525 throw getIOError(e); 526 } finally { 527 closeTable(table); 528 } 529 } 530 531 @Override 532 public void deleteAll(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, 533 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 534 deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP, attributes); 535 } 536 537 @Override 538 public void deleteAllTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long timestamp, 539 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 540 Table table = null; 541 try { 542 table = getTable(tableName); 543 Delete delete = new Delete(getBytes(row)); 544 addAttributes(delete, attributes); 545 byte[][] famAndQf = CellUtil.parseColumn(getBytes(column)); 546 if (famAndQf.length == 1) { 547 delete.addFamily(famAndQf[0], timestamp); 548 } else { 549 delete.addColumns(famAndQf[0], famAndQf[1], timestamp); 550 } 551 table.delete(delete); 552 553 } catch (IOException e) { 554 LOG.warn(e.getMessage(), e); 555 throw getIOError(e); 556 } finally { 557 closeTable(table); 558 } 559 } 560 561 @Override 562 public void deleteAllRow(ByteBuffer tableName, ByteBuffer row, 563 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 564 deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes); 565 } 566 567 @Override 568 public void deleteAllRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp, 569 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 570 Table table = null; 571 try { 572 table = getTable(tableName); 573 Delete delete = new Delete(getBytes(row), timestamp); 574 addAttributes(delete, attributes); 575 table.delete(delete); 576 } catch (IOException e) { 577 LOG.warn(e.getMessage(), e); 578 throw getIOError(e); 579 } finally { 580 closeTable(table); 581 } 582 } 583 584 @Override 585 public void createTable(ByteBuffer in_tableName, List<ColumnDescriptor> columnFamilies) 586 throws IOError, IllegalArgument, AlreadyExists { 587 TableName tableName = getTableName(in_tableName); 588 try { 589 if (getAdmin().tableExists(tableName)) { 590 throw new AlreadyExists("table name already in use"); 591 } 592 HTableDescriptor desc = new HTableDescriptor(tableName); 593 for (ColumnDescriptor col : columnFamilies) { 594 HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col); 595 desc.addFamily(colDesc); 596 } 597 getAdmin().createTable(desc); 598 } catch (IOException e) { 599 LOG.warn(e.getMessage(), e); 600 throw getIOError(e); 601 } catch (IllegalArgumentException e) { 602 LOG.warn(e.getMessage(), e); 603 throw new IllegalArgument(Throwables.getStackTraceAsString(e)); 604 } 605 } 606 607 private static TableName getTableName(ByteBuffer buffer) { 608 return TableName.valueOf(getBytes(buffer)); 609 } 610 611 @Override 612 public void deleteTable(ByteBuffer in_tableName) throws IOError { 613 TableName tableName = getTableName(in_tableName); 614 if (LOG.isDebugEnabled()) { 615 LOG.debug("deleteTable: table={}", tableName); 616 } 617 try { 618 if (!getAdmin().tableExists(tableName)) { 619 throw new IOException("table does not exist"); 620 } 621 getAdmin().deleteTable(tableName); 622 } catch (IOException e) { 623 LOG.warn(e.getMessage(), e); 624 throw getIOError(e); 625 } 626 } 627 628 @Override 629 public void mutateRow(ByteBuffer tableName, ByteBuffer row, List<Mutation> mutations, 630 Map<ByteBuffer, ByteBuffer> attributes) throws IOError, IllegalArgument { 631 mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP, attributes); 632 } 633 634 @Override 635 public void mutateRowTs(ByteBuffer tableName, ByteBuffer row, List<Mutation> mutations, 636 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, IllegalArgument { 637 Table table = null; 638 try { 639 table = getTable(tableName); 640 Put put = new Put(getBytes(row), timestamp); 641 addAttributes(put, attributes); 642 643 Delete delete = new Delete(getBytes(row)); 644 addAttributes(delete, attributes); 645 if (metrics != null) { 646 metrics.incNumRowKeysInBatchMutate(mutations.size()); 647 } 648 649 // I apologize for all this mess :) 650 CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); 651 for (Mutation m : mutations) { 652 byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column)); 653 if (m.isDelete) { 654 if (famAndQf.length == 1) { 655 delete.addFamily(famAndQf[0], timestamp); 656 } else { 657 delete.addColumns(famAndQf[0], famAndQf[1], timestamp); 658 } 659 delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 660 } else { 661 if (famAndQf.length == 1) { 662 LOG.warn("No column qualifier specified. Delete is the only mutation supported " 663 + "over the whole column family."); 664 } else { 665 put.add(builder.clear().setRow(put.getRow()).setFamily(famAndQf[0]) 666 .setQualifier(famAndQf[1]).setTimestamp(put.getTimestamp()).setType(Cell.Type.Put) 667 .setValue(m.value != null ? getBytes(m.value) : HConstants.EMPTY_BYTE_ARRAY).build()); 668 } 669 put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 670 } 671 } 672 if (!delete.isEmpty()) { 673 table.delete(delete); 674 } 675 if (!put.isEmpty()) { 676 table.put(put); 677 } 678 } catch (IOException e) { 679 LOG.warn(e.getMessage(), e); 680 throw getIOError(e); 681 } catch (IllegalArgumentException e) { 682 LOG.warn(e.getMessage(), e); 683 throw new IllegalArgument(Throwables.getStackTraceAsString(e)); 684 } finally { 685 closeTable(table); 686 } 687 } 688 689 @Override 690 public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches, 691 Map<ByteBuffer, ByteBuffer> attributes) throws IOError, IllegalArgument, TException { 692 mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes); 693 } 694 695 @Override 696 public void mutateRowsTs(ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp, 697 Map<ByteBuffer, ByteBuffer> attributes) throws IOError, IllegalArgument, TException { 698 List<Put> puts = new ArrayList<>(); 699 List<Delete> deletes = new ArrayList<>(); 700 CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); 701 for (BatchMutation batch : rowBatches) { 702 byte[] row = getBytes(batch.row); 703 List<Mutation> mutations = batch.mutations; 704 Delete delete = new Delete(row); 705 addAttributes(delete, attributes); 706 Put put = new Put(row, timestamp); 707 addAttributes(put, attributes); 708 for (Mutation m : mutations) { 709 byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column)); 710 if (m.isDelete) { 711 // no qualifier, family only. 712 if (famAndQf.length == 1) { 713 delete.addFamily(famAndQf[0], timestamp); 714 } else { 715 delete.addColumns(famAndQf[0], famAndQf[1], timestamp); 716 } 717 delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 718 } else { 719 if (famAndQf.length == 1) { 720 LOG.warn("No column qualifier specified. Delete is the only mutation supported " 721 + "over the whole column family."); 722 } 723 if (famAndQf.length == 2) { 724 try { 725 put.add(builder.clear().setRow(put.getRow()).setFamily(famAndQf[0]) 726 .setQualifier(famAndQf[1]).setTimestamp(put.getTimestamp()).setType(Cell.Type.Put) 727 .setValue(m.value != null ? getBytes(m.value) : HConstants.EMPTY_BYTE_ARRAY) 728 .build()); 729 } catch (IOException e) { 730 throw new IllegalArgumentException(e); 731 } 732 } else { 733 throw new IllegalArgumentException("Invalid famAndQf provided."); 734 } 735 put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 736 } 737 } 738 if (!delete.isEmpty()) { 739 deletes.add(delete); 740 } 741 if (!put.isEmpty()) { 742 puts.add(put); 743 } 744 } 745 746 Table table = null; 747 try { 748 table = getTable(tableName); 749 if (!puts.isEmpty()) { 750 table.put(puts); 751 } 752 if (!deletes.isEmpty()) { 753 table.delete(deletes); 754 } 755 } catch (IOException e) { 756 LOG.warn(e.getMessage(), e); 757 throw getIOError(e); 758 } catch (IllegalArgumentException e) { 759 LOG.warn(e.getMessage(), e); 760 throw new IllegalArgument(Throwables.getStackTraceAsString(e)); 761 } finally { 762 closeTable(table); 763 } 764 } 765 766 @Override 767 public long atomicIncrement(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount) 768 throws IOError, IllegalArgument, TException { 769 byte[][] famAndQf = CellUtil.parseColumn(getBytes(column)); 770 if (famAndQf.length == 1) { 771 return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount); 772 } 773 return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount); 774 } 775 776 protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row, byte[] family, 777 byte[] qualifier, long amount) throws IOError, IllegalArgument, TException { 778 Table table = null; 779 try { 780 table = getTable(tableName); 781 return table.incrementColumnValue(getBytes(row), family, qualifier, amount); 782 } catch (IOException e) { 783 LOG.warn(e.getMessage(), e); 784 throw getIOError(e); 785 } finally { 786 closeTable(table); 787 } 788 } 789 790 @Override 791 public void scannerClose(int id) throws IOError, IllegalArgument { 792 LOG.debug("scannerClose: id={}", id); 793 ResultScannerWrapper resultScannerWrapper = getScanner(id); 794 if (resultScannerWrapper == null) { 795 LOG.warn("scanner ID is invalid"); 796 throw new IllegalArgument("scanner ID is invalid"); 797 } 798 resultScannerWrapper.getScanner().close(); 799 removeScanner(id); 800 } 801 802 @Override 803 public List<TRowResult> scannerGetList(int id, int nbRows) throws IllegalArgument, IOError { 804 LOG.debug("scannerGetList: id={}", id); 805 ResultScannerWrapper resultScannerWrapper = getScanner(id); 806 if (null == resultScannerWrapper) { 807 String message = "scanner ID is invalid"; 808 LOG.warn(message); 809 throw new IllegalArgument("scanner ID is invalid"); 810 } 811 812 Result[] results; 813 try { 814 results = resultScannerWrapper.getScanner().next(nbRows); 815 if (null == results) { 816 return new ArrayList<>(); 817 } 818 } catch (IOException e) { 819 LOG.warn(e.getMessage(), e); 820 throw getIOError(e); 821 } finally { 822 // Add scanner back to scannerMap; protects against case 823 // where scanner expired during processing of request. 824 scannerMap.put(id, resultScannerWrapper); 825 } 826 return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted()); 827 } 828 829 @Override 830 public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError { 831 return scannerGetList(id, 1); 832 } 833 834 @Override 835 public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan, 836 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 837 838 Table table = null; 839 try { 840 table = getTable(tableName); 841 Scan scan = new Scan(); 842 addAttributes(scan, attributes); 843 if (tScan.isSetStartRow()) { 844 scan.setStartRow(tScan.getStartRow()); 845 } 846 if (tScan.isSetStopRow()) { 847 scan.setStopRow(tScan.getStopRow()); 848 } 849 if (tScan.isSetTimestamp()) { 850 scan.setTimeRange(0, tScan.getTimestamp()); 851 } 852 if (tScan.isSetCaching()) { 853 scan.setCaching(tScan.getCaching()); 854 } 855 if (tScan.isSetBatchSize()) { 856 scan.setBatch(tScan.getBatchSize()); 857 } 858 if (tScan.isSetColumns() && !tScan.getColumns().isEmpty()) { 859 for (ByteBuffer column : tScan.getColumns()) { 860 byte[][] famQf = CellUtil.parseColumn(getBytes(column)); 861 if (famQf.length == 1) { 862 scan.addFamily(famQf[0]); 863 } else { 864 scan.addColumn(famQf[0], famQf[1]); 865 } 866 } 867 } 868 if (tScan.isSetFilterString()) { 869 ParseFilter parseFilter = new ParseFilter(); 870 scan.setFilter(parseFilter.parseFilterString(tScan.getFilterString())); 871 } 872 if (tScan.isSetReversed()) { 873 scan.setReversed(tScan.isReversed()); 874 } 875 if (tScan.isSetCacheBlocks()) { 876 scan.setCacheBlocks(tScan.isCacheBlocks()); 877 } 878 return addScanner(table.getScanner(scan), tScan.sortColumns); 879 } catch (IOException e) { 880 LOG.warn(e.getMessage(), e); 881 throw getIOError(e); 882 } finally { 883 closeTable(table); 884 } 885 } 886 887 @Override 888 public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow, List<ByteBuffer> columns, 889 Map<ByteBuffer, ByteBuffer> attributes) throws IOError { 890 891 Table table = null; 892 try { 893 table = getTable(tableName); 894 Scan scan = new Scan(getBytes(startRow)); 895 addAttributes(scan, attributes); 896 if (columns != null && !columns.isEmpty()) { 897 for (ByteBuffer column : columns) { 898 byte[][] famQf = CellUtil.parseColumn(getBytes(column)); 899 if (famQf.length == 1) { 900 scan.addFamily(famQf[0]); 901 } else { 902 scan.addColumn(famQf[0], famQf[1]); 903 } 904 } 905 } 906 return addScanner(table.getScanner(scan), false); 907 } catch (IOException e) { 908 LOG.warn(e.getMessage(), e); 909 throw getIOError(e); 910 } finally { 911 closeTable(table); 912 } 913 } 914 915 @Override 916 public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow, ByteBuffer stopRow, 917 List<ByteBuffer> columns, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException { 918 919 Table table = null; 920 try { 921 table = getTable(tableName); 922 Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); 923 addAttributes(scan, attributes); 924 if (columns != null && !columns.isEmpty()) { 925 for (ByteBuffer column : columns) { 926 byte[][] famQf = CellUtil.parseColumn(getBytes(column)); 927 if (famQf.length == 1) { 928 scan.addFamily(famQf[0]); 929 } else { 930 scan.addColumn(famQf[0], famQf[1]); 931 } 932 } 933 } 934 return addScanner(table.getScanner(scan), false); 935 } catch (IOException e) { 936 LOG.warn(e.getMessage(), e); 937 throw getIOError(e); 938 } finally { 939 closeTable(table); 940 } 941 } 942 943 @Override 944 public int scannerOpenWithPrefix(ByteBuffer tableName, ByteBuffer startAndPrefix, 945 List<ByteBuffer> columns, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException { 946 947 Table table = null; 948 try { 949 table = getTable(tableName); 950 Scan scan = new Scan(getBytes(startAndPrefix)); 951 addAttributes(scan, attributes); 952 Filter f = new WhileMatchFilter(new PrefixFilter(getBytes(startAndPrefix))); 953 scan.setFilter(f); 954 if (columns != null && !columns.isEmpty()) { 955 for (ByteBuffer column : columns) { 956 byte[][] famQf = CellUtil.parseColumn(getBytes(column)); 957 if (famQf.length == 1) { 958 scan.addFamily(famQf[0]); 959 } else { 960 scan.addColumn(famQf[0], famQf[1]); 961 } 962 } 963 } 964 return addScanner(table.getScanner(scan), false); 965 } catch (IOException e) { 966 LOG.warn(e.getMessage(), e); 967 throw getIOError(e); 968 } finally { 969 closeTable(table); 970 } 971 } 972 973 @Override 974 public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow, List<ByteBuffer> columns, 975 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException { 976 977 Table table = null; 978 try { 979 table = getTable(tableName); 980 Scan scan = new Scan(getBytes(startRow)); 981 addAttributes(scan, attributes); 982 scan.setTimeRange(0, timestamp); 983 if (columns != null && !columns.isEmpty()) { 984 for (ByteBuffer column : columns) { 985 byte[][] famQf = CellUtil.parseColumn(getBytes(column)); 986 if (famQf.length == 1) { 987 scan.addFamily(famQf[0]); 988 } else { 989 scan.addColumn(famQf[0], famQf[1]); 990 } 991 } 992 } 993 return addScanner(table.getScanner(scan), false); 994 } catch (IOException e) { 995 LOG.warn(e.getMessage(), e); 996 throw getIOError(e); 997 } finally { 998 closeTable(table); 999 } 1000 } 1001 1002 @Override 1003 public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow, ByteBuffer stopRow, 1004 List<ByteBuffer> columns, long timestamp, Map<ByteBuffer, ByteBuffer> attributes) 1005 throws IOError, TException { 1006 1007 Table table = null; 1008 try { 1009 table = getTable(tableName); 1010 Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); 1011 addAttributes(scan, attributes); 1012 scan.setTimeRange(0, timestamp); 1013 if (columns != null && !columns.isEmpty()) { 1014 for (ByteBuffer column : columns) { 1015 byte[][] famQf = CellUtil.parseColumn(getBytes(column)); 1016 if (famQf.length == 1) { 1017 scan.addFamily(famQf[0]); 1018 } else { 1019 scan.addColumn(famQf[0], famQf[1]); 1020 } 1021 } 1022 } 1023 scan.setTimeRange(0, timestamp); 1024 return addScanner(table.getScanner(scan), false); 1025 } catch (IOException e) { 1026 LOG.warn(e.getMessage(), e); 1027 throw getIOError(e); 1028 } finally { 1029 closeTable(table); 1030 } 1031 } 1032 1033 @Override 1034 public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(ByteBuffer tableName) 1035 throws IOError, TException { 1036 1037 Table table = null; 1038 try { 1039 TreeMap<ByteBuffer, ColumnDescriptor> columns = new TreeMap<>(); 1040 1041 table = getTable(tableName); 1042 HTableDescriptor desc = table.getTableDescriptor(); 1043 1044 for (HColumnDescriptor e : desc.getFamilies()) { 1045 ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e); 1046 columns.put(col.name, col); 1047 } 1048 return columns; 1049 } catch (IOException e) { 1050 LOG.warn(e.getMessage(), e); 1051 throw getIOError(e); 1052 } finally { 1053 closeTable(table); 1054 } 1055 } 1056 1057 private void closeTable(Table table) throws IOError { 1058 try { 1059 if (table != null) { 1060 table.close(); 1061 } 1062 } catch (IOException e) { 1063 LOG.error(e.getMessage(), e); 1064 throw getIOError(e); 1065 } 1066 } 1067 1068 @Override 1069 public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError { 1070 try { 1071 byte[] row = getBytes(searchRow); 1072 Result startRowResult = 1073 getReverseScanResult(TableName.META_TABLE_NAME.getName(), row, HConstants.CATALOG_FAMILY); 1074 1075 if (startRowResult == null) { 1076 throw new IOException( 1077 "Cannot find row in " + TableName.META_TABLE_NAME + ", row=" + Bytes.toStringBinary(row)); 1078 } 1079 1080 // find region start and end keys 1081 RegionInfo regionInfo = MetaTableAccessor.getRegionInfo(startRowResult); 1082 if (regionInfo == null) { 1083 throw new IOException("RegionInfo REGIONINFO was null or " + " empty in Meta for row=" 1084 + Bytes.toStringBinary(row)); 1085 } 1086 TRegionInfo region = new TRegionInfo(); 1087 region.setStartKey(regionInfo.getStartKey()); 1088 region.setEndKey(regionInfo.getEndKey()); 1089 region.id = regionInfo.getRegionId(); 1090 region.setName(regionInfo.getRegionName()); 1091 region.version = HREGION_VERSION; // version not used anymore, PB encoding used. 1092 1093 // find region assignment to server 1094 ServerName serverName = MetaTableAccessor.getServerName(startRowResult, 0); 1095 if (serverName != null) { 1096 region.setServerName(Bytes.toBytes(serverName.getHostname())); 1097 region.port = serverName.getPort(); 1098 } 1099 return region; 1100 } catch (IOException e) { 1101 LOG.warn(e.getMessage(), e); 1102 throw getIOError(e); 1103 } 1104 } 1105 1106 private Result getReverseScanResult(byte[] tableName, byte[] row, byte[] family) 1107 throws IOException { 1108 Scan scan = new Scan(row); 1109 scan.setReversed(true); 1110 scan.addFamily(family); 1111 scan.setStartRow(row); 1112 try (Table table = getTable(tableName); ResultScanner scanner = table.getScanner(scan)) { 1113 return scanner.next(); 1114 } 1115 } 1116 1117 @Override 1118 public void increment(TIncrement tincrement) throws IOError, TException { 1119 1120 if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) { 1121 throw new TException("Must supply a table and a row key; can't increment"); 1122 } 1123 1124 if (conf.getBoolean(COALESCE_INC_KEY, false)) { 1125 this.coalescer.queueIncrement(tincrement); 1126 return; 1127 } 1128 1129 Table table = null; 1130 try { 1131 table = getTable(tincrement.getTable()); 1132 Increment inc = ThriftUtilities.incrementFromThrift(tincrement); 1133 table.increment(inc); 1134 } catch (IOException e) { 1135 LOG.warn(e.getMessage(), e); 1136 throw getIOError(e); 1137 } finally { 1138 closeTable(table); 1139 } 1140 } 1141 1142 @Override 1143 public void incrementRows(List<TIncrement> tincrements) throws IOError, TException { 1144 if (conf.getBoolean(COALESCE_INC_KEY, false)) { 1145 this.coalescer.queueIncrements(tincrements); 1146 return; 1147 } 1148 for (TIncrement tinc : tincrements) { 1149 increment(tinc); 1150 } 1151 } 1152 1153 @Override 1154 public List<TCell> append(TAppend tappend) throws IOError, TException { 1155 if (tappend.getRow().length == 0 || tappend.getTable().length == 0) { 1156 throw new TException("Must supply a table and a row key; can't append"); 1157 } 1158 1159 Table table = null; 1160 try { 1161 table = getTable(tappend.getTable()); 1162 Append append = ThriftUtilities.appendFromThrift(tappend); 1163 Result result = table.append(append); 1164 return ThriftUtilities.cellFromHBase(result.rawCells()); 1165 } catch (IOException e) { 1166 LOG.warn(e.getMessage(), e); 1167 throw getIOError(e); 1168 } finally { 1169 closeTable(table); 1170 } 1171 } 1172 1173 @Override 1174 public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, 1175 ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) 1176 throws IOError, IllegalArgument, TException { 1177 Put put; 1178 try { 1179 put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP); 1180 addAttributes(put, attributes); 1181 1182 byte[][] famAndQf = CellUtil.parseColumn(getBytes(mput.column)); 1183 put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow()) 1184 .setFamily(famAndQf[0]).setQualifier(famAndQf[1]).setTimestamp(put.getTimestamp()) 1185 .setType(Cell.Type.Put) 1186 .setValue(mput.value != null ? getBytes(mput.value) : HConstants.EMPTY_BYTE_ARRAY).build()); 1187 put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1188 } catch (IOException | IllegalArgumentException e) { 1189 LOG.warn(e.getMessage(), e); 1190 throw new IllegalArgument(Throwables.getStackTraceAsString(e)); 1191 } 1192 1193 Table table = null; 1194 try { 1195 table = getTable(tableName); 1196 byte[][] famAndQf = CellUtil.parseColumn(getBytes(column)); 1197 Table.CheckAndMutateBuilder mutateBuilder = 1198 table.checkAndMutate(getBytes(row), famAndQf[0]).qualifier(famAndQf[1]); 1199 if (value != null) { 1200 return mutateBuilder.ifEquals(getBytes(value)).thenPut(put); 1201 } else { 1202 return mutateBuilder.ifNotExists().thenPut(put); 1203 } 1204 } catch (IOException e) { 1205 LOG.warn(e.getMessage(), e); 1206 throw getIOError(e); 1207 } catch (IllegalArgumentException e) { 1208 LOG.warn(e.getMessage(), e); 1209 throw new IllegalArgument(Throwables.getStackTraceAsString(e)); 1210 } finally { 1211 closeTable(table); 1212 } 1213 } 1214 1215 @Override 1216 public TThriftServerType getThriftServerType() { 1217 return TThriftServerType.ONE; 1218 } 1219 1220 @Override 1221 public String getClusterId() throws TException { 1222 return connectionCache.getClusterId(); 1223 } 1224 1225 private static IOError getIOError(Throwable throwable) { 1226 IOError error = new IOErrorWithCause(throwable); 1227 error.setCanRetry(!(throwable instanceof DoNotRetryIOException)); 1228 error.setMessage(Throwables.getStackTraceAsString(throwable)); 1229 return error; 1230 } 1231 1232 /** 1233 * Adds all the attributes into the Operation object 1234 */ 1235 private static void addAttributes(OperationWithAttributes op, 1236 Map<ByteBuffer, ByteBuffer> attributes) { 1237 if (attributes == null || attributes.isEmpty()) { 1238 return; 1239 } 1240 for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) { 1241 String name = Bytes.toStringBinary(getBytes(entry.getKey())); 1242 byte[] value = getBytes(entry.getValue()); 1243 op.setAttribute(name, value); 1244 } 1245 } 1246 1247 protected static class ResultScannerWrapper { 1248 1249 private final ResultScanner scanner; 1250 private final boolean sortColumns; 1251 1252 public ResultScannerWrapper(ResultScanner resultScanner, boolean sortResultColumns) { 1253 scanner = resultScanner; 1254 sortColumns = sortResultColumns; 1255 } 1256 1257 public ResultScanner getScanner() { 1258 return scanner; 1259 } 1260 1261 public boolean isColumnSorted() { 1262 return sortColumns; 1263 } 1264 } 1265 1266 public static class IOErrorWithCause extends IOError { 1267 private final Throwable cause; 1268 1269 public IOErrorWithCause(Throwable cause) { 1270 this.cause = cause; 1271 } 1272 1273 @Override 1274 public synchronized Throwable getCause() { 1275 return cause; 1276 } 1277 1278 @Override 1279 public boolean equals(Object other) { 1280 if (super.equals(other) && other instanceof IOErrorWithCause) { 1281 Throwable otherCause = ((IOErrorWithCause) other).getCause(); 1282 if (this.getCause() != null) { 1283 return otherCause != null && this.getCause().equals(otherCause); 1284 } else { 1285 return otherCause == null; 1286 } 1287 } 1288 return false; 1289 } 1290 1291 @Override 1292 public int hashCode() { 1293 int result = super.hashCode(); 1294 result = 31 * result + (cause != null ? cause.hashCode() : 0); 1295 return result; 1296 } 1297 } 1298 1299}