001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.rest; 019 020import com.fasterxml.jackson.core.JsonParseException; 021import com.fasterxml.jackson.databind.JsonMappingException; 022import com.github.benmanes.caffeine.cache.Cache; 023import com.github.benmanes.caffeine.cache.Caffeine; 024import com.github.benmanes.caffeine.cache.RemovalCause; 025import java.io.IOException; 026import java.net.URI; 027import java.util.concurrent.TimeUnit; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.TableNotFoundException; 031import org.apache.hadoop.hbase.filter.Filter; 032import org.apache.hadoop.hbase.rest.model.ScannerModel; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.javax.ws.rs.Consumes; 038import org.apache.hbase.thirdparty.javax.ws.rs.POST; 039import org.apache.hbase.thirdparty.javax.ws.rs.PUT; 040import org.apache.hbase.thirdparty.javax.ws.rs.Path; 041import org.apache.hbase.thirdparty.javax.ws.rs.PathParam; 042import org.apache.hbase.thirdparty.javax.ws.rs.core.Context; 043import org.apache.hbase.thirdparty.javax.ws.rs.core.Response; 044import org.apache.hbase.thirdparty.javax.ws.rs.core.UriBuilder; 045import org.apache.hbase.thirdparty.javax.ws.rs.core.UriInfo; 046 047@InterfaceAudience.Private 048public class ScannerResource extends ResourceBase { 049 050 private static final Logger LOG = LoggerFactory.getLogger(ScannerResource.class); 051 052 private static final Cache<String, ScannerInstanceResource> scanners = setupScanners(); 053 TableResource tableResource; 054 055 /** 056 * Constructor 057 */ 058 public ScannerResource(TableResource tableResource) throws IOException { 059 super(); 060 this.tableResource = tableResource; 061 } 062 063 private static Cache<String, ScannerInstanceResource> setupScanners() { 064 final Configuration conf = HBaseConfiguration.create(); 065 066 int size = conf.getInt(REST_SCANNERCACHE_SIZE, DEFAULT_REST_SCANNERCACHE_SIZE); 067 long evictTimeoutMs = conf.getTimeDuration(REST_SCANNERCACHE_EXPIRE_TIME, 068 DEFAULT_REST_SCANNERCACHE_EXPIRE_TIME_MS, TimeUnit.MILLISECONDS); 069 070 Cache<String, ScannerInstanceResource> cache = 071 Caffeine.newBuilder().removalListener(ScannerResource::removalListener).maximumSize(size) 072 .expireAfterAccess(evictTimeoutMs, TimeUnit.MILLISECONDS) 073 .<String, ScannerInstanceResource> build(); 074 075 return cache; 076 } 077 078 static boolean delete(final String id) { 079 ScannerInstanceResource instance = scanners.asMap().remove(id); 080 if (instance != null) { 081 instance.generator.close(); 082 return true; 083 } else { 084 return false; 085 } 086 } 087 088 static void removalListener(String key, ScannerInstanceResource value, RemovalCause cause) { 089 if (cause.wasEvicted()) { 090 delete(key); 091 } 092 } 093 094 Response update(final ScannerModel model, final boolean replace, final UriInfo uriInfo) { 095 servlet.getMetrics().incrementRequests(1); 096 if (servlet.isReadOnly()) { 097 return Response.status(Response.Status.FORBIDDEN).type(MIMETYPE_TEXT) 098 .entity("Forbidden" + CRLF).build(); 099 } 100 byte[] endRow = model.hasEndRow() ? model.getEndRow() : null; 101 RowSpec spec = null; 102 if (model.getLabels() != null) { 103 spec = new RowSpec(model.getStartRow(), endRow, model.getColumns(), model.getStartTime(), 104 model.getEndTime(), model.getMaxVersions(), model.getLabels()); 105 } else { 106 spec = new RowSpec(model.getStartRow(), endRow, model.getColumns(), model.getStartTime(), 107 model.getEndTime(), model.getMaxVersions()); 108 } 109 110 try { 111 Filter filter = ScannerResultGenerator.buildFilterFromModel(model); 112 String tableName = tableResource.getName(); 113 ScannerResultGenerator gen = new ScannerResultGenerator(tableName, spec, filter, 114 model.getCaching(), model.getCacheBlocks(), model.getLimit(), model.isIncludeStartRow(), 115 model.isIncludeStopRow()); 116 String id = gen.getID(); 117 ScannerInstanceResource instance = 118 new ScannerInstanceResource(tableName, id, gen, model.getBatch()); 119 scanners.put(id, instance); 120 if (LOG.isTraceEnabled()) { 121 LOG.trace("new scanner: " + id); 122 } 123 UriBuilder builder = uriInfo.getAbsolutePathBuilder(); 124 URI uri = builder.path(id).build(); 125 servlet.getMetrics().incrementSucessfulPutRequests(1); 126 return Response.created(uri).build(); 127 } catch (Exception e) { 128 LOG.error("Exception occurred while processing " + uriInfo.getAbsolutePath() + " : ", e); 129 servlet.getMetrics().incrementFailedPutRequests(1); 130 if (e instanceof TableNotFoundException) { 131 return Response.status(Response.Status.NOT_FOUND).type(MIMETYPE_TEXT) 132 .entity("Not found" + CRLF).build(); 133 } else if ( 134 e instanceof RuntimeException 135 || e instanceof JsonMappingException | e instanceof JsonParseException 136 ) { 137 return Response.status(Response.Status.BAD_REQUEST).type(MIMETYPE_TEXT) 138 .entity("Bad request" + CRLF).build(); 139 } 140 return Response.status(Response.Status.SERVICE_UNAVAILABLE).type(MIMETYPE_TEXT) 141 .entity("Unavailable" + CRLF).build(); 142 } 143 } 144 145 @PUT 146 @Consumes({ MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF, MIMETYPE_PROTOBUF_IETF }) 147 public Response put(final ScannerModel model, final @Context UriInfo uriInfo) { 148 if (LOG.isTraceEnabled()) { 149 LOG.trace("PUT " + uriInfo.getAbsolutePath()); 150 } 151 return update(model, true, uriInfo); 152 } 153 154 @POST 155 @Consumes({ MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF, MIMETYPE_PROTOBUF_IETF }) 156 public Response post(final ScannerModel model, final @Context UriInfo uriInfo) { 157 if (LOG.isTraceEnabled()) { 158 LOG.trace("POST " + uriInfo.getAbsolutePath()); 159 } 160 return update(model, false, uriInfo); 161 } 162 163 @Path("{scanner: .+}") 164 public ScannerInstanceResource getScannerInstanceResource(final @PathParam("scanner") String id) 165 throws IOException { 166 ScannerInstanceResource instance = scanners.getIfPresent(id); 167 if (instance == null) { 168 servlet.getMetrics().incrementFailedGetRequests(1); 169 return new ScannerInstanceResource(); 170 } else { 171 servlet.getMetrics().incrementSucessfulGetRequests(1); 172 } 173 return instance; 174 } 175}