001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rest;
019
020import com.fasterxml.jackson.core.JsonParseException;
021import com.fasterxml.jackson.databind.JsonMappingException;
022import com.github.benmanes.caffeine.cache.Cache;
023import com.github.benmanes.caffeine.cache.Caffeine;
024import com.github.benmanes.caffeine.cache.RemovalCause;
025import java.io.IOException;
026import java.net.URI;
027import java.util.concurrent.TimeUnit;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.TableNotFoundException;
031import org.apache.hadoop.hbase.filter.Filter;
032import org.apache.hadoop.hbase.rest.model.ScannerModel;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hbase.thirdparty.javax.ws.rs.Consumes;
038import org.apache.hbase.thirdparty.javax.ws.rs.POST;
039import org.apache.hbase.thirdparty.javax.ws.rs.PUT;
040import org.apache.hbase.thirdparty.javax.ws.rs.Path;
041import org.apache.hbase.thirdparty.javax.ws.rs.PathParam;
042import org.apache.hbase.thirdparty.javax.ws.rs.core.Context;
043import org.apache.hbase.thirdparty.javax.ws.rs.core.Response;
044import org.apache.hbase.thirdparty.javax.ws.rs.core.UriBuilder;
045import org.apache.hbase.thirdparty.javax.ws.rs.core.UriInfo;
046
047@InterfaceAudience.Private
048public class ScannerResource extends ResourceBase {
049
050  private static final Logger LOG = LoggerFactory.getLogger(ScannerResource.class);
051
052  private static final Cache<String, ScannerInstanceResource> scanners = setupScanners();
053  TableResource tableResource;
054
055  /**
056   * Constructor
057   */
058  public ScannerResource(TableResource tableResource) throws IOException {
059    super();
060    this.tableResource = tableResource;
061  }
062
063  private static Cache<String, ScannerInstanceResource> setupScanners() {
064    final Configuration conf = HBaseConfiguration.create();
065
066    int size = conf.getInt(REST_SCANNERCACHE_SIZE, DEFAULT_REST_SCANNERCACHE_SIZE);
067    long evictTimeoutMs = conf.getTimeDuration(REST_SCANNERCACHE_EXPIRE_TIME,
068      DEFAULT_REST_SCANNERCACHE_EXPIRE_TIME_MS, TimeUnit.MILLISECONDS);
069
070    Cache<String, ScannerInstanceResource> cache =
071      Caffeine.newBuilder().removalListener(ScannerResource::removalListener).maximumSize(size)
072        .expireAfterAccess(evictTimeoutMs, TimeUnit.MILLISECONDS)
073        .<String, ScannerInstanceResource> build();
074
075    return cache;
076  }
077
078  static boolean delete(final String id) {
079    ScannerInstanceResource instance = scanners.asMap().remove(id);
080    if (instance != null) {
081      instance.generator.close();
082      return true;
083    } else {
084      return false;
085    }
086  }
087
088  static void removalListener(String key, ScannerInstanceResource value, RemovalCause cause) {
089    if (cause.wasEvicted()) {
090      delete(key);
091    }
092  }
093
094  Response update(final ScannerModel model, final boolean replace, final UriInfo uriInfo) {
095    servlet.getMetrics().incrementRequests(1);
096    if (servlet.isReadOnly()) {
097      return Response.status(Response.Status.FORBIDDEN).type(MIMETYPE_TEXT)
098        .entity("Forbidden" + CRLF).build();
099    }
100    byte[] endRow = model.hasEndRow() ? model.getEndRow() : null;
101    RowSpec spec = null;
102    if (model.getLabels() != null) {
103      spec = new RowSpec(model.getStartRow(), endRow, model.getColumns(), model.getStartTime(),
104        model.getEndTime(), model.getMaxVersions(), model.getLabels());
105    } else {
106      spec = new RowSpec(model.getStartRow(), endRow, model.getColumns(), model.getStartTime(),
107        model.getEndTime(), model.getMaxVersions());
108    }
109
110    try {
111      Filter filter = ScannerResultGenerator.buildFilterFromModel(model);
112      String tableName = tableResource.getName();
113      ScannerResultGenerator gen = new ScannerResultGenerator(tableName, spec, filter,
114        model.getCaching(), model.getCacheBlocks(), model.getLimit(), model.isIncludeStartRow(),
115        model.isIncludeStopRow());
116      String id = gen.getID();
117      ScannerInstanceResource instance =
118        new ScannerInstanceResource(tableName, id, gen, model.getBatch());
119      scanners.put(id, instance);
120      if (LOG.isTraceEnabled()) {
121        LOG.trace("new scanner: " + id);
122      }
123      UriBuilder builder = uriInfo.getAbsolutePathBuilder();
124      URI uri = builder.path(id).build();
125      servlet.getMetrics().incrementSucessfulPutRequests(1);
126      return Response.created(uri).build();
127    } catch (Exception e) {
128      LOG.error("Exception occurred while processing " + uriInfo.getAbsolutePath() + " : ", e);
129      servlet.getMetrics().incrementFailedPutRequests(1);
130      if (e instanceof TableNotFoundException) {
131        return Response.status(Response.Status.NOT_FOUND).type(MIMETYPE_TEXT)
132          .entity("Not found" + CRLF).build();
133      } else if (
134        e instanceof RuntimeException
135          || e instanceof JsonMappingException | e instanceof JsonParseException
136      ) {
137        return Response.status(Response.Status.BAD_REQUEST).type(MIMETYPE_TEXT)
138          .entity("Bad request" + CRLF).build();
139      }
140      return Response.status(Response.Status.SERVICE_UNAVAILABLE).type(MIMETYPE_TEXT)
141        .entity("Unavailable" + CRLF).build();
142    }
143  }
144
145  @PUT
146  @Consumes({ MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF, MIMETYPE_PROTOBUF_IETF })
147  public Response put(final ScannerModel model, final @Context UriInfo uriInfo) {
148    if (LOG.isTraceEnabled()) {
149      LOG.trace("PUT " + uriInfo.getAbsolutePath());
150    }
151    return update(model, true, uriInfo);
152  }
153
154  @POST
155  @Consumes({ MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF, MIMETYPE_PROTOBUF_IETF })
156  public Response post(final ScannerModel model, final @Context UriInfo uriInfo) {
157    if (LOG.isTraceEnabled()) {
158      LOG.trace("POST " + uriInfo.getAbsolutePath());
159    }
160    return update(model, false, uriInfo);
161  }
162
163  @Path("{scanner: .+}")
164  public ScannerInstanceResource getScannerInstanceResource(final @PathParam("scanner") String id)
165    throws IOException {
166    ScannerInstanceResource instance = scanners.getIfPresent(id);
167    if (instance == null) {
168      servlet.getMetrics().incrementFailedGetRequests(1);
169      return new ScannerInstanceResource();
170    } else {
171      servlet.getMetrics().incrementSucessfulGetRequests(1);
172    }
173    return instance;
174  }
175}