001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rest.client;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.io.UnsupportedEncodingException;
023import java.net.URLEncoder;
024import java.nio.charset.StandardCharsets;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collection;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.TreeMap;
033import java.util.concurrent.TimeUnit;
034import org.apache.commons.lang3.NotImplementedException;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.CompareOperator;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Append;
044import org.apache.hadoop.hbase.client.CheckAndMutate;
045import org.apache.hadoop.hbase.client.CheckAndMutateResult;
046import org.apache.hadoop.hbase.client.Delete;
047import org.apache.hadoop.hbase.client.Durability;
048import org.apache.hadoop.hbase.client.Get;
049import org.apache.hadoop.hbase.client.Increment;
050import org.apache.hadoop.hbase.client.Put;
051import org.apache.hadoop.hbase.client.RegionLocator;
052import org.apache.hadoop.hbase.client.Result;
053import org.apache.hadoop.hbase.client.ResultScanner;
054import org.apache.hadoop.hbase.client.Row;
055import org.apache.hadoop.hbase.client.RowMutations;
056import org.apache.hadoop.hbase.client.Scan;
057import org.apache.hadoop.hbase.client.Table;
058import org.apache.hadoop.hbase.client.TableDescriptor;
059import org.apache.hadoop.hbase.client.coprocessor.Batch;
060import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
061import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
062import org.apache.hadoop.hbase.filter.Filter;
063import org.apache.hadoop.hbase.io.TimeRange;
064import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
065import org.apache.hadoop.hbase.rest.Constants;
066import org.apache.hadoop.hbase.rest.model.CellModel;
067import org.apache.hadoop.hbase.rest.model.CellSetModel;
068import org.apache.hadoop.hbase.rest.model.RowModel;
069import org.apache.hadoop.hbase.rest.model.ScannerModel;
070import org.apache.hadoop.hbase.rest.model.TableSchemaModel;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.util.StringUtils;
073import org.apache.yetus.audience.InterfaceAudience;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
078import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
079import org.apache.hbase.thirdparty.com.google.protobuf.Message;
080import org.apache.hbase.thirdparty.com.google.protobuf.Service;
081import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
082
083/**
084 * HTable interface to remote tables accessed via REST gateway
085 */
086@InterfaceAudience.Private
087public class RemoteHTable implements Table {
088
089  private static final Logger LOG = LoggerFactory.getLogger(RemoteHTable.class);
090
091  final Client client;
092  final Configuration conf;
093  final byte[] name;
094  final int maxRetries;
095  final long sleepTime;
096  private String pathPrefix = "/";
097
098  @SuppressWarnings("rawtypes")
099  protected String buildRowSpec(final byte[] row, final Map familyMap, final long startTime,
100    final long endTime, final int maxVersions) {
101    StringBuilder sb = new StringBuilder();
102    sb.append(pathPrefix);
103    sb.append(Bytes.toString(name));
104    sb.append('/');
105    sb.append(toURLEncodedBytes(row));
106    Set families = familyMap.entrySet();
107    if (families != null) {
108      Iterator i = familyMap.entrySet().iterator();
109      sb.append('/');
110      while (i.hasNext()) {
111        Map.Entry e = (Map.Entry) i.next();
112        Collection quals = (Collection) e.getValue();
113        if (quals == null || quals.isEmpty()) {
114          // this is an unqualified family. append the family name and NO ':'
115          sb.append(toURLEncodedBytes((byte[]) e.getKey()));
116        } else {
117          Iterator ii = quals.iterator();
118          while (ii.hasNext()) {
119            sb.append(toURLEncodedBytes((byte[]) e.getKey()));
120            Object o = ii.next();
121            // Puts use byte[] but Deletes use KeyValue
122            if (o instanceof byte[]) {
123              sb.append(':');
124              sb.append(toURLEncodedBytes((byte[]) o));
125            } else if (o instanceof KeyValue) {
126              if (((KeyValue) o).getQualifierLength() != 0) {
127                sb.append(':');
128                sb.append(toURLEncodedBytes(CellUtil.cloneQualifier((KeyValue) o)));
129              }
130            } else {
131              throw new RuntimeException("object type not handled");
132            }
133            if (ii.hasNext()) {
134              sb.append(',');
135            }
136          }
137        }
138        if (i.hasNext()) {
139          sb.append(',');
140        }
141      }
142    }
143    if (startTime >= 0 && endTime != Long.MAX_VALUE) {
144      sb.append('/');
145      sb.append(startTime);
146      if (startTime != endTime) {
147        sb.append(',');
148        sb.append(endTime);
149      }
150    } else if (endTime != Long.MAX_VALUE) {
151      sb.append('/');
152      sb.append(endTime);
153    }
154    if (maxVersions > 1) {
155      sb.append("?v=");
156      sb.append(maxVersions);
157    }
158    return sb.toString();
159  }
160
161  protected String buildMultiRowSpec(final byte[][] rows, int maxVersions) {
162    StringBuilder sb = new StringBuilder();
163    sb.append(pathPrefix);
164    sb.append(Bytes.toString(name));
165    sb.append("/multiget/");
166    if (rows == null || rows.length == 0) {
167      return sb.toString();
168    }
169    sb.append("?");
170    for (int i = 0; i < rows.length; i++) {
171      byte[] rk = rows[i];
172      if (i != 0) {
173        sb.append('&');
174      }
175      sb.append("row=");
176      sb.append(toURLEncodedBytes(rk));
177    }
178    sb.append("&v=");
179    sb.append(maxVersions);
180
181    return sb.toString();
182  }
183
184  protected Result[] buildResultFromModel(final CellSetModel model) {
185    List<Result> results = new ArrayList<>();
186    for (RowModel row : model.getRows()) {
187      List<Cell> kvs = new ArrayList<>(row.getCells().size());
188      for (CellModel cell : row.getCells()) {
189        byte[][] split = CellUtil.parseColumn(cell.getColumn());
190        byte[] column = split[0];
191        byte[] qualifier = null;
192        if (split.length == 1) {
193          qualifier = HConstants.EMPTY_BYTE_ARRAY;
194        } else if (split.length == 2) {
195          qualifier = split[1];
196        } else {
197          throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
198        }
199        kvs
200          .add(new KeyValue(row.getKey(), column, qualifier, cell.getTimestamp(), cell.getValue()));
201      }
202      results.add(Result.create(kvs));
203    }
204    return results.toArray(new Result[results.size()]);
205  }
206
207  protected CellSetModel buildModelFromPut(Put put) {
208    RowModel row = new RowModel(put.getRow());
209    long ts = put.getTimestamp();
210    for (List<Cell> cells : put.getFamilyCellMap().values()) {
211      for (Cell cell : cells) {
212        row.addCell(new CellModel(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
213          ts != HConstants.LATEST_TIMESTAMP ? ts : cell.getTimestamp(), CellUtil.cloneValue(cell)));
214      }
215    }
216    CellSetModel model = new CellSetModel();
217    model.addRow(row);
218    return model;
219  }
220
221  /**
222   * Constructor
223   */
224  public RemoteHTable(Client client, String name) {
225    this(client, HBaseConfiguration.create(), Bytes.toBytes(name));
226  }
227
228  /**
229   * Constructor
230   */
231  public RemoteHTable(Client client, Configuration conf, String name) {
232    this(client, conf, Bytes.toBytes(name));
233  }
234
235  /**
236   * Constructor
237   */
238  public RemoteHTable(Client client, Configuration conf, byte[] name) {
239    this.client = client;
240    this.conf = conf;
241    this.name = name;
242    this.maxRetries = conf.getInt("hbase.rest.client.max.retries", 10);
243    this.sleepTime = conf.getLong("hbase.rest.client.sleep", 1000);
244  }
245
246  public RemoteHTable(Client client, Configuration conf, byte[] name, String pathPrefix) {
247    this(client, conf, name);
248    this.pathPrefix = pathPrefix + "/";
249  }
250
251  public byte[] getTableName() {
252    return name.clone();
253  }
254
255  @Override
256  public TableName getName() {
257    return TableName.valueOf(name);
258  }
259
260  @Override
261  public Configuration getConfiguration() {
262    return conf;
263  }
264
265  @Override
266  public void close() throws IOException {
267    client.shutdown();
268  }
269
270  @Override
271  public Result get(Get get) throws IOException {
272    TimeRange range = get.getTimeRange();
273    String spec = buildRowSpec(get.getRow(), get.getFamilyMap(), range.getMin(), range.getMax(),
274      get.getMaxVersions());
275    if (get.getFilter() != null) {
276      LOG.warn("filters not supported on gets");
277    }
278    Result[] results = getResults(spec);
279    if (results.length > 0) {
280      if (results.length > 1) {
281        LOG.warn("too many results for get (" + results.length + ")");
282      }
283      return results[0];
284    } else {
285      return new Result();
286    }
287  }
288
289  @Override
290  public Result[] get(List<Get> gets) throws IOException {
291    byte[][] rows = new byte[gets.size()][];
292    int maxVersions = 1;
293    int count = 0;
294
295    for (Get g : gets) {
296
297      if (count == 0) {
298        maxVersions = g.getMaxVersions();
299      } else if (g.getMaxVersions() != maxVersions) {
300        LOG.warn(
301          "MaxVersions on Gets do not match, using the first in the list (" + maxVersions + ")");
302      }
303
304      if (g.getFilter() != null) {
305        LOG.warn("filters not supported on gets");
306      }
307
308      rows[count] = g.getRow();
309      count++;
310    }
311
312    String spec = buildMultiRowSpec(rows, maxVersions);
313
314    return getResults(spec);
315  }
316
317  private Result[] getResults(String spec) throws IOException {
318    for (int i = 0; i < maxRetries; i++) {
319      Response response = client.get(spec, Constants.MIMETYPE_PROTOBUF);
320      int code = response.getCode();
321      switch (code) {
322        case 200:
323          CellSetModel model = new CellSetModel();
324          model.getObjectFromMessage(response.getBody());
325          Result[] results = buildResultFromModel(model);
326          if (results.length > 0) {
327            return results;
328          }
329          // fall through
330        case 404:
331          return new Result[0];
332
333        case 509:
334          try {
335            Thread.sleep(sleepTime);
336          } catch (InterruptedException e) {
337            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
338          }
339          break;
340        default:
341          throw new IOException("get request returned " + code);
342      }
343    }
344    throw new IOException("get request timed out");
345  }
346
347  @Override
348  public boolean exists(Get get) throws IOException {
349    LOG.warn("exists() is really get(), just use get()");
350    Result result = get(get);
351    return (result != null && !(result.isEmpty()));
352  }
353
354  @Override
355  public boolean[] exists(List<Get> gets) throws IOException {
356    LOG.warn("exists(List<Get>) is really list of get() calls, just use get()");
357    boolean[] results = new boolean[gets.size()];
358    for (int i = 0; i < results.length; i++) {
359      results[i] = exists(gets.get(i));
360    }
361    return results;
362  }
363
364  @Override
365  public void put(Put put) throws IOException {
366    CellSetModel model = buildModelFromPut(put);
367    StringBuilder sb = new StringBuilder();
368    sb.append(pathPrefix);
369    sb.append(Bytes.toString(name));
370    sb.append('/');
371    sb.append(toURLEncodedBytes(put.getRow()));
372    for (int i = 0; i < maxRetries; i++) {
373      Response response =
374        client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
375      int code = response.getCode();
376      switch (code) {
377        case 200:
378          return;
379        case 509:
380          try {
381            Thread.sleep(sleepTime);
382          } catch (InterruptedException e) {
383            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
384          }
385          break;
386        default:
387          throw new IOException("put request failed with " + code);
388      }
389    }
390    throw new IOException("put request timed out");
391  }
392
393  @Override
394  public void put(List<Put> puts) throws IOException {
395    // this is a trick: The gateway accepts multiple rows in a cell set and
396    // ignores the row specification in the URI
397
398    // separate puts by row
399    TreeMap<byte[], List<Cell>> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
400    for (Put put : puts) {
401      byte[] row = put.getRow();
402      List<Cell> cells = map.get(row);
403      if (cells == null) {
404        cells = new ArrayList<>();
405        map.put(row, cells);
406      }
407      for (List<Cell> l : put.getFamilyCellMap().values()) {
408        cells.addAll(l);
409      }
410    }
411
412    // build the cell set
413    CellSetModel model = new CellSetModel();
414    for (Map.Entry<byte[], List<Cell>> e : map.entrySet()) {
415      RowModel row = new RowModel(e.getKey());
416      for (Cell cell : e.getValue()) {
417        row.addCell(new CellModel(cell));
418      }
419      model.addRow(row);
420    }
421
422    // build path for multiput
423    StringBuilder sb = new StringBuilder();
424    sb.append(pathPrefix);
425    sb.append(Bytes.toString(name));
426    sb.append("/$multiput"); // can be any nonexistent row
427    for (int i = 0; i < maxRetries; i++) {
428      Response response =
429        client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
430      int code = response.getCode();
431      switch (code) {
432        case 200:
433          return;
434        case 509:
435          try {
436            Thread.sleep(sleepTime);
437          } catch (InterruptedException e) {
438            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
439          }
440          break;
441        default:
442          throw new IOException("multiput request failed with " + code);
443      }
444    }
445    throw new IOException("multiput request timed out");
446  }
447
448  @Override
449  public void delete(Delete delete) throws IOException {
450    String spec = buildRowSpec(delete.getRow(), delete.getFamilyCellMap(), delete.getTimestamp(),
451      delete.getTimestamp(), 1);
452    for (int i = 0; i < maxRetries; i++) {
453      Response response = client.delete(spec);
454      int code = response.getCode();
455      switch (code) {
456        case 200:
457          return;
458        case 509:
459          try {
460            Thread.sleep(sleepTime);
461          } catch (InterruptedException e) {
462            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
463          }
464          break;
465        default:
466          throw new IOException("delete request failed with " + code);
467      }
468    }
469    throw new IOException("delete request timed out");
470  }
471
472  @Override
473  public void delete(List<Delete> deletes) throws IOException {
474    for (Delete delete : deletes) {
475      delete(delete);
476    }
477  }
478
479  public void flushCommits() throws IOException {
480    // no-op
481  }
482
483  @Override
484  public TableDescriptor getDescriptor() throws IOException {
485    StringBuilder sb = new StringBuilder();
486    sb.append(pathPrefix);
487    sb.append(Bytes.toString(name));
488    sb.append('/');
489    sb.append("schema");
490    for (int i = 0; i < maxRetries; i++) {
491      Response response = client.get(sb.toString(), Constants.MIMETYPE_PROTOBUF);
492      int code = response.getCode();
493      switch (code) {
494        case 200:
495          TableSchemaModel schema = new TableSchemaModel();
496          schema.getObjectFromMessage(response.getBody());
497          return schema.getTableDescriptor();
498        case 509:
499          try {
500            Thread.sleep(sleepTime);
501          } catch (InterruptedException e) {
502            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
503          }
504          break;
505        default:
506          throw new IOException("schema request returned " + code);
507      }
508    }
509    throw new IOException("schema request timed out");
510  }
511
512  class Scanner implements ResultScanner {
513
514    String uri;
515    private Result[] cachedResults;
516    private int nextCachedResultsRow = 0;
517
518    public Scanner(Scan scan) throws IOException {
519      ScannerModel model;
520      try {
521        model = ScannerModel.fromScan(scan);
522      } catch (Exception e) {
523        throw new IOException(e);
524      }
525      StringBuilder sb = new StringBuilder();
526      sb.append(pathPrefix);
527      sb.append(Bytes.toString(name));
528      sb.append('/');
529      sb.append("scanner");
530      for (int i = 0; i < maxRetries; i++) {
531        Response response =
532          client.post(sb.toString(), Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
533        int code = response.getCode();
534        switch (code) {
535          case 201:
536            uri = response.getLocation();
537            return;
538          case 509:
539            try {
540              Thread.sleep(sleepTime);
541            } catch (InterruptedException e) {
542              throw (InterruptedIOException) new InterruptedIOException().initCause(e);
543            }
544            break;
545          default:
546            throw new IOException("scan request failed with " + code);
547        }
548      }
549      throw new IOException("scan request timed out");
550    }
551
552    public Result[] nextBatch() throws IOException {
553      StringBuilder sb = new StringBuilder(uri);
554      for (int i = 0; i < maxRetries; i++) {
555        Response response = client.get(sb.toString(), Constants.MIMETYPE_PROTOBUF);
556        int code = response.getCode();
557        switch (code) {
558          case 200:
559            CellSetModel model = new CellSetModel();
560            model.getObjectFromMessage(response.getBody());
561            return buildResultFromModel(model);
562          case 204:
563          case 206:
564            return null;
565          case 509:
566            try {
567              Thread.sleep(sleepTime);
568            } catch (InterruptedException e) {
569              throw (InterruptedIOException) new InterruptedIOException().initCause(e);
570            }
571            break;
572          default:
573            throw new IOException("scanner.next request failed with " + code);
574        }
575      }
576      throw new IOException("scanner.next request timed out");
577    }
578
579    private boolean updateCachedResults() throws IOException {
580      if (cachedResults == null || nextCachedResultsRow >= cachedResults.length) {
581        nextCachedResultsRow = 0;
582        cachedResults = nextBatch();
583      }
584      return !(cachedResults == null || cachedResults.length < 1);
585    }
586
587    @Override
588    public Result[] next(int nbRows) throws IOException {
589      if (!updateCachedResults()) {
590        return null;
591      }
592      int endIndex = Math.min(cachedResults.length, nextCachedResultsRow + nbRows);
593      Result[] chunk = Arrays.copyOfRange(cachedResults, nextCachedResultsRow, endIndex);
594      nextCachedResultsRow = endIndex;
595      return chunk;
596    }
597
598    @Override
599    public Result next() throws IOException {
600      if (!updateCachedResults()) {
601        return null;
602      }
603      return cachedResults[nextCachedResultsRow++];
604    }
605
606    class Iter implements Iterator<Result> {
607
608      Result cache;
609
610      public Iter() {
611        try {
612          cache = Scanner.this.next();
613        } catch (IOException e) {
614          LOG.warn(StringUtils.stringifyException(e));
615        }
616      }
617
618      @Override
619      public boolean hasNext() {
620        return cache != null;
621      }
622
623      @Override
624      public Result next() {
625        Result result = cache;
626        try {
627          cache = Scanner.this.next();
628        } catch (IOException e) {
629          LOG.warn(StringUtils.stringifyException(e));
630          cache = null;
631        }
632        return result;
633      }
634
635      @Override
636      public void remove() {
637        throw new RuntimeException("remove() not supported");
638      }
639
640    }
641
642    @Override
643    public Iterator<Result> iterator() {
644      return new Iter();
645    }
646
647    @Override
648    public void close() {
649      try {
650        client.delete(uri);
651      } catch (IOException e) {
652        LOG.warn(StringUtils.stringifyException(e));
653      }
654    }
655
656    @Override
657    public boolean renewLease() {
658      throw new RuntimeException("renewLease() not supported");
659    }
660
661    @Override
662    public ScanMetrics getScanMetrics() {
663      throw new RuntimeException("getScanMetrics() not supported");
664    }
665  }
666
667  @Override
668  public ResultScanner getScanner(Scan scan) throws IOException {
669    return new Scanner(scan);
670  }
671
672  @Override
673  public ResultScanner getScanner(byte[] family) throws IOException {
674    Scan scan = new Scan();
675    scan.addFamily(family);
676    return new Scanner(scan);
677  }
678
679  @Override
680  public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
681    Scan scan = new Scan();
682    scan.addColumn(family, qualifier);
683    return new Scanner(scan);
684  }
685
686  public boolean isAutoFlush() {
687    return true;
688  }
689
690  private boolean doCheckAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put)
691    throws IOException {
692    // column to check-the-value
693    put.add(new KeyValue(row, family, qualifier, value));
694
695    CellSetModel model = buildModelFromPut(put);
696    StringBuilder sb = new StringBuilder();
697    sb.append(pathPrefix);
698    sb.append(Bytes.toString(name));
699    sb.append('/');
700    sb.append(toURLEncodedBytes(put.getRow()));
701    sb.append("?check=put");
702
703    for (int i = 0; i < maxRetries; i++) {
704      Response response =
705        client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
706      int code = response.getCode();
707      switch (code) {
708        case 200:
709          return true;
710        case 304: // NOT-MODIFIED
711          return false;
712        case 509:
713          try {
714            Thread.sleep(sleepTime);
715          } catch (final InterruptedException e) {
716            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
717          }
718          break;
719        default:
720          throw new IOException("checkAndPut request failed with " + code);
721      }
722    }
723    throw new IOException("checkAndPut request timed out");
724  }
725
726  private boolean doCheckAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value,
727    Delete delete) throws IOException {
728    Put put = new Put(row, HConstants.LATEST_TIMESTAMP, delete.getFamilyCellMap());
729    // column to check-the-value
730    put.add(new KeyValue(row, family, qualifier, value));
731    CellSetModel model = buildModelFromPut(put);
732    StringBuilder sb = new StringBuilder();
733    sb.append(pathPrefix);
734    sb.append(Bytes.toString(name));
735    sb.append('/');
736    sb.append(toURLEncodedBytes(row));
737    sb.append("?check=delete");
738
739    for (int i = 0; i < maxRetries; i++) {
740      Response response =
741        client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
742      int code = response.getCode();
743      switch (code) {
744        case 200:
745          return true;
746        case 304: // NOT-MODIFIED
747          return false;
748        case 509:
749          try {
750            Thread.sleep(sleepTime);
751          } catch (final InterruptedException e) {
752            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
753          }
754          break;
755        default:
756          throw new IOException("checkAndDelete request failed with " + code);
757      }
758    }
759    throw new IOException("checkAndDelete request timed out");
760  }
761
762  @Override
763  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
764    return new CheckAndMutateBuilderImpl(row, family);
765  }
766
767  @Override
768  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
769    throw new NotImplementedException("Implement later");
770  }
771
772  @Override
773  public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) {
774    throw new NotImplementedException("Implement later");
775  }
776
777  @Override
778  public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates) {
779    throw new NotImplementedException("Implement later");
780  }
781
782  @Override
783  public Result increment(Increment increment) throws IOException {
784    throw new IOException("Increment not supported");
785  }
786
787  @Override
788  public Result append(Append append) throws IOException {
789    throw new IOException("Append not supported");
790  }
791
792  @Override
793  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
794    throws IOException {
795    throw new IOException("incrementColumnValue not supported");
796  }
797
798  @Override
799  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
800    Durability durability) throws IOException {
801    throw new IOException("incrementColumnValue not supported");
802  }
803
804  @Override
805  public void batch(List<? extends Row> actions, Object[] results) throws IOException {
806    throw new IOException("batch not supported");
807  }
808
809  @Override
810  public <R> void batchCallback(List<? extends Row> actions, Object[] results,
811    Batch.Callback<R> callback) throws IOException, InterruptedException {
812    throw new IOException("batchCallback not supported");
813  }
814
815  @Override
816  public CoprocessorRpcChannel coprocessorService(byte[] row) {
817    throw new UnsupportedOperationException("coprocessorService not implemented");
818  }
819
820  @Override
821  public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey,
822    byte[] endKey, Batch.Call<T, R> callable) throws ServiceException, Throwable {
823    throw new UnsupportedOperationException("coprocessorService not implemented");
824  }
825
826  @Override
827  public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey,
828    byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback)
829    throws ServiceException, Throwable {
830    throw new UnsupportedOperationException("coprocessorService not implemented");
831  }
832
833  @Override
834  public Result mutateRow(RowMutations rm) throws IOException {
835    throw new IOException("atomicMutation not supported");
836  }
837
838  @Override
839  public <R extends Message> Map<byte[], R> batchCoprocessorService(
840    Descriptors.MethodDescriptor method, Message request, byte[] startKey, byte[] endKey,
841    R responsePrototype) throws ServiceException, Throwable {
842    throw new UnsupportedOperationException("batchCoprocessorService not implemented");
843  }
844
845  @Override
846  public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor method,
847    Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
848    throws ServiceException, Throwable {
849    throw new UnsupportedOperationException("batchCoprocessorService not implemented");
850  }
851
852  @Override
853  public long getReadRpcTimeout(TimeUnit unit) {
854    throw new UnsupportedOperationException();
855  }
856
857  @Override
858  public long getRpcTimeout(TimeUnit unit) {
859    throw new UnsupportedOperationException();
860  }
861
862  @Override
863  public long getWriteRpcTimeout(TimeUnit unit) {
864    throw new UnsupportedOperationException();
865  }
866
867  @Override
868  public long getOperationTimeout(TimeUnit unit) {
869    throw new UnsupportedOperationException();
870  }
871
872  /*
873   * Only a small subset of characters are valid in URLs. Row keys, column families, and qualifiers
874   * cannot be appended to URLs without first URL escaping. Table names are ok because they can only
875   * contain alphanumeric, ".","_", and "-" which are valid characters in URLs.
876   */
877  private static String toURLEncodedBytes(byte[] row) {
878    try {
879      return URLEncoder.encode(new String(row, StandardCharsets.UTF_8), "UTF-8");
880    } catch (UnsupportedEncodingException e) {
881      throw new IllegalStateException("URLEncoder doesn't support UTF-8", e);
882    }
883  }
884
885  private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
886
887    private final byte[] row;
888    private final byte[] family;
889    private byte[] qualifier;
890    private byte[] value;
891
892    CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
893      this.row = Preconditions.checkNotNull(row, "row is null");
894      this.family = Preconditions.checkNotNull(family, "family is null");
895    }
896
897    @Override
898    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
899      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using"
900        + " an empty byte array, or just do not call this method if you want a null qualifier");
901      return this;
902    }
903
904    @Override
905    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
906      throw new UnsupportedOperationException("timeRange not implemented");
907    }
908
909    @Override
910    public CheckAndMutateBuilder ifNotExists() {
911      throw new UnsupportedOperationException(
912        "CheckAndMutate for non-equal comparison " + "not implemented");
913    }
914
915    @Override
916    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
917      if (compareOp == CompareOperator.EQUAL) {
918        this.value = Preconditions.checkNotNull(value, "value is null");
919        return this;
920      } else {
921        throw new UnsupportedOperationException(
922          "CheckAndMutate for non-equal comparison " + "not implemented");
923      }
924    }
925
926    @Override
927    public CheckAndMutateBuilder ifEquals(byte[] value) {
928      this.value = Preconditions.checkNotNull(value, "value is null");
929      return this;
930    }
931
932    @Override
933    public boolean thenPut(Put put) throws IOException {
934      return doCheckAndPut(row, family, qualifier, value, put);
935    }
936
937    @Override
938    public boolean thenDelete(Delete delete) throws IOException {
939      return doCheckAndDelete(row, family, qualifier, value, delete);
940    }
941
942    @Override
943    public boolean thenMutate(RowMutations mutation) throws IOException {
944      throw new UnsupportedOperationException("thenMutate not implemented");
945    }
946  }
947
948  @Override
949  public RegionLocator getRegionLocator() throws IOException {
950    throw new UnsupportedOperationException();
951  }
952}