001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.rest; 019 020import com.fasterxml.jackson.core.JsonParseException; 021import com.fasterxml.jackson.databind.JsonMappingException; 022import com.github.benmanes.caffeine.cache.Cache; 023import com.github.benmanes.caffeine.cache.Caffeine; 024import com.github.benmanes.caffeine.cache.RemovalCause; 025import java.io.IOException; 026import java.net.URI; 027import java.util.concurrent.TimeUnit; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.TableNotFoundException; 031import org.apache.hadoop.hbase.filter.Filter; 032import org.apache.hadoop.hbase.rest.model.ScannerModel; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.javax.ws.rs.Consumes; 038import org.apache.hbase.thirdparty.javax.ws.rs.POST; 039import org.apache.hbase.thirdparty.javax.ws.rs.PUT; 040import org.apache.hbase.thirdparty.javax.ws.rs.Path; 041import org.apache.hbase.thirdparty.javax.ws.rs.PathParam; 042import org.apache.hbase.thirdparty.javax.ws.rs.core.Context; 043import org.apache.hbase.thirdparty.javax.ws.rs.core.Response; 044import org.apache.hbase.thirdparty.javax.ws.rs.core.UriBuilder; 045import org.apache.hbase.thirdparty.javax.ws.rs.core.UriInfo; 046 047@InterfaceAudience.Private 048public class ScannerResource extends ResourceBase { 049 050 private static final Logger LOG = LoggerFactory.getLogger(ScannerResource.class); 051 052 private static final Cache<String, ScannerInstanceResource> scanners = setupScanners(); 053 TableResource tableResource; 054 055 /** 056 * Constructor 057 */ 058 public ScannerResource(TableResource tableResource) throws IOException { 059 super(); 060 this.tableResource = tableResource; 061 } 062 063 private static Cache<String, ScannerInstanceResource> setupScanners() { 064 final Configuration conf = HBaseConfiguration.create(); 065 066 int size = conf.getInt(REST_SCANNERCACHE_SIZE, DEFAULT_REST_SCANNERCACHE_SIZE); 067 long evictTimeoutMs = conf.getTimeDuration(REST_SCANNERCACHE_EXPIRE_TIME, 068 DEFAULT_REST_SCANNERCACHE_EXPIRE_TIME_MS, TimeUnit.MILLISECONDS); 069 070 Cache<String, ScannerInstanceResource> cache = 071 Caffeine.newBuilder().removalListener(ScannerResource::removalListener).maximumSize(size) 072 .expireAfterAccess(evictTimeoutMs, TimeUnit.MILLISECONDS) 073 .<String, ScannerInstanceResource> build(); 074 075 return cache; 076 } 077 078 static boolean delete(final String id) { 079 ScannerInstanceResource instance = scanners.asMap().remove(id); 080 if (instance != null) { 081 instance.generator.close(); 082 return true; 083 } else { 084 return false; 085 } 086 } 087 088 static void removalListener(String key, ScannerInstanceResource value, RemovalCause cause) { 089 if (cause.wasEvicted()) { 090 delete(key); 091 } 092 } 093 094 Response update(final ScannerModel model, final boolean replace, final UriInfo uriInfo) { 095 servlet.getMetrics().incrementRequests(1); 096 if (servlet.isReadOnly()) { 097 return Response.status(Response.Status.FORBIDDEN).type(MIMETYPE_TEXT) 098 .entity("Forbidden" + CRLF).build(); 099 } 100 byte[] endRow = model.hasEndRow() ? model.getEndRow() : null; 101 RowSpec spec = null; 102 if (model.getLabels() != null) { 103 spec = new RowSpec(model.getStartRow(), endRow, model.getColumns(), model.getStartTime(), 104 model.getEndTime(), model.getMaxVersions(), model.getLabels()); 105 } else { 106 spec = new RowSpec(model.getStartRow(), endRow, model.getColumns(), model.getStartTime(), 107 model.getEndTime(), model.getMaxVersions()); 108 } 109 110 try { 111 Filter filter = ScannerResultGenerator.buildFilterFromModel(model); 112 String tableName = tableResource.getName(); 113 ScannerResultGenerator gen = new ScannerResultGenerator(tableName, spec, filter, 114 model.getCaching(), model.getCacheBlocks()); 115 String id = gen.getID(); 116 ScannerInstanceResource instance = 117 new ScannerInstanceResource(tableName, id, gen, model.getBatch()); 118 scanners.put(id, instance); 119 if (LOG.isTraceEnabled()) { 120 LOG.trace("new scanner: " + id); 121 } 122 UriBuilder builder = uriInfo.getAbsolutePathBuilder(); 123 URI uri = builder.path(id).build(); 124 servlet.getMetrics().incrementSucessfulPutRequests(1); 125 return Response.created(uri).build(); 126 } catch (Exception e) { 127 LOG.error("Exception occurred while processing " + uriInfo.getAbsolutePath() + " : ", e); 128 servlet.getMetrics().incrementFailedPutRequests(1); 129 if (e instanceof TableNotFoundException) { 130 return Response.status(Response.Status.NOT_FOUND).type(MIMETYPE_TEXT) 131 .entity("Not found" + CRLF).build(); 132 } else if ( 133 e instanceof RuntimeException 134 || e instanceof JsonMappingException | e instanceof JsonParseException 135 ) { 136 return Response.status(Response.Status.BAD_REQUEST).type(MIMETYPE_TEXT) 137 .entity("Bad request" + CRLF).build(); 138 } 139 return Response.status(Response.Status.SERVICE_UNAVAILABLE).type(MIMETYPE_TEXT) 140 .entity("Unavailable" + CRLF).build(); 141 } 142 } 143 144 @PUT 145 @Consumes({ MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF, MIMETYPE_PROTOBUF_IETF }) 146 public Response put(final ScannerModel model, final @Context UriInfo uriInfo) { 147 if (LOG.isTraceEnabled()) { 148 LOG.trace("PUT " + uriInfo.getAbsolutePath()); 149 } 150 return update(model, true, uriInfo); 151 } 152 153 @POST 154 @Consumes({ MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF, MIMETYPE_PROTOBUF_IETF }) 155 public Response post(final ScannerModel model, final @Context UriInfo uriInfo) { 156 if (LOG.isTraceEnabled()) { 157 LOG.trace("POST " + uriInfo.getAbsolutePath()); 158 } 159 return update(model, false, uriInfo); 160 } 161 162 @Path("{scanner: .+}") 163 public ScannerInstanceResource getScannerInstanceResource(final @PathParam("scanner") String id) 164 throws IOException { 165 ScannerInstanceResource instance = scanners.getIfPresent(id); 166 if (instance == null) { 167 servlet.getMetrics().incrementFailedGetRequests(1); 168 return new ScannerInstanceResource(); 169 } else { 170 servlet.getMetrics().incrementSucessfulGetRequests(1); 171 } 172 return instance; 173 } 174}