001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.protobuf.Service; 021import java.io.IOException; 022import java.lang.reflect.InvocationTargetException; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.CacheEvictionStats; 025import org.apache.hadoop.hbase.ServerName; 026import org.apache.hadoop.hbase.SharedConnection; 027import org.apache.hadoop.hbase.client.Connection; 028import org.apache.hadoop.hbase.client.Mutation; 029import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 030import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 031import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity; 032import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 033import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 034import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; 035import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; 036import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; 037import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; 038import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService; 039import org.apache.hadoop.hbase.metrics.MetricRegistry; 040import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 041import org.apache.hadoop.hbase.security.User; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 047 048@InterfaceAudience.Private 049public class RegionServerCoprocessorHost 050 extends CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> { 051 052 private static final Logger LOG = LoggerFactory.getLogger(RegionServerCoprocessorHost.class); 053 054 private RegionServerServices rsServices; 055 056 public RegionServerCoprocessorHost(RegionServerServices rsServices, Configuration conf) { 057 super(rsServices); 058 this.rsServices = rsServices; 059 this.conf = conf; 060 // Log the state of coprocessor loading here; should appear only once or 061 // twice in the daemon log, depending on HBase version, because there is 062 // only one RegionServerCoprocessorHost instance in the RS process 063 boolean coprocessorsEnabled = 064 conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED); 065 boolean tableCoprocessorsEnabled = 066 conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED); 067 LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled")); 068 LOG.info("Table coprocessor loading is " 069 + ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled")); 070 loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY); 071 } 072 073 @Override 074 public RegionServerEnvironment createEnvironment(RegionServerCoprocessor instance, int priority, 075 int sequence, Configuration conf) { 076 // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices. 077 return instance.getClass().isAnnotationPresent(CoreCoprocessor.class) 078 ? new RegionServerEnvironmentForCoreCoprocessors(instance, priority, sequence, conf, 079 this.rsServices) 080 : new RegionServerEnvironment(instance, priority, sequence, conf, this.rsServices); 081 } 082 083 @Override 084 public RegionServerCoprocessor checkAndGetInstance(Class<?> implClass) 085 throws InstantiationException, IllegalAccessException { 086 try { 087 if (RegionServerCoprocessor.class.isAssignableFrom(implClass)) { 088 return implClass.asSubclass(RegionServerCoprocessor.class).getDeclaredConstructor() 089 .newInstance(); 090 } else if (SingletonCoprocessorService.class.isAssignableFrom(implClass)) { 091 // For backward compatibility with old CoprocessorService impl which don't extend 092 // RegionCoprocessor. 093 SingletonCoprocessorService tmp = implClass.asSubclass(SingletonCoprocessorService.class) 094 .getDeclaredConstructor().newInstance(); 095 return new CoprocessorServiceBackwardCompatiblity.RegionServerCoprocessorService(tmp); 096 } else { 097 LOG.error("{} is not of type RegionServerCoprocessor. Check the configuration of {}", 098 implClass.getName(), CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY); 099 return null; 100 } 101 } catch (NoSuchMethodException | InvocationTargetException e) { 102 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 103 } 104 } 105 106 private ObserverGetter<RegionServerCoprocessor, RegionServerObserver> rsObserverGetter = 107 RegionServerCoprocessor::getRegionServerObserver; 108 109 abstract class RegionServerObserverOperation 110 extends ObserverOperationWithoutResult<RegionServerObserver> { 111 public RegionServerObserverOperation() { 112 super(rsObserverGetter); 113 } 114 115 public RegionServerObserverOperation(User user) { 116 super(rsObserverGetter, user); 117 } 118 } 119 120 ////////////////////////////////////////////////////////////////////////////////////////////////// 121 // RegionServerObserver operations 122 ////////////////////////////////////////////////////////////////////////////////////////////////// 123 124 public void preStop(String message, User user) throws IOException { 125 // While stopping the region server all coprocessors method should be executed first then the 126 // coprocessor should be cleaned up. 127 if (coprocEnvironments.isEmpty()) { 128 return; 129 } 130 execShutdown(new RegionServerObserverOperation(user) { 131 @Override 132 public void call(RegionServerObserver observer) throws IOException { 133 observer.preStopRegionServer(this); 134 } 135 136 @Override 137 public void postEnvCall() { 138 // invoke coprocessor stop method 139 shutdown(this.getEnvironment()); 140 } 141 }); 142 } 143 144 public void preRollWALWriterRequest() throws IOException { 145 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 146 @Override 147 public void call(RegionServerObserver observer) throws IOException { 148 observer.preRollWALWriterRequest(this); 149 } 150 }); 151 } 152 153 public void postRollWALWriterRequest() throws IOException { 154 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 155 @Override 156 public void call(RegionServerObserver observer) throws IOException { 157 observer.postRollWALWriterRequest(this); 158 } 159 }); 160 } 161 162 public void preReplicateLogEntries() throws IOException { 163 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 164 @Override 165 public void call(RegionServerObserver observer) throws IOException { 166 observer.preReplicateLogEntries(this); 167 } 168 }); 169 } 170 171 public void postReplicateLogEntries() throws IOException { 172 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 173 @Override 174 public void call(RegionServerObserver observer) throws IOException { 175 observer.postReplicateLogEntries(this); 176 } 177 }); 178 } 179 180 public void preReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation) 181 throws IOException { 182 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 183 @Override 184 public void call(RegionServerObserver observer) throws IOException { 185 observer.preReplicationSinkBatchMutate(this, walEntry, mutation); 186 } 187 }); 188 } 189 190 public void postReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation) 191 throws IOException { 192 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 193 @Override 194 public void call(RegionServerObserver observer) throws IOException { 195 observer.postReplicationSinkBatchMutate(this, walEntry, mutation); 196 } 197 }); 198 } 199 200 public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint) 201 throws IOException { 202 if (this.coprocEnvironments.isEmpty()) { 203 return endpoint; 204 } 205 return execOperationWithResult( 206 new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(rsObserverGetter, 207 endpoint) { 208 @Override 209 public ReplicationEndpoint call(RegionServerObserver observer) throws IOException { 210 return observer.postCreateReplicationEndPoint(this, getResult()); 211 } 212 }); 213 } 214 215 public void preClearCompactionQueues() throws IOException { 216 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 217 @Override 218 public void call(RegionServerObserver observer) throws IOException { 219 observer.preClearCompactionQueues(this); 220 } 221 }); 222 } 223 224 public void postClearCompactionQueues() throws IOException { 225 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 226 @Override 227 public void call(RegionServerObserver observer) throws IOException { 228 observer.postClearCompactionQueues(this); 229 } 230 }); 231 } 232 233 public void preExecuteProcedures() throws IOException { 234 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 235 @Override 236 public void call(RegionServerObserver observer) throws IOException { 237 observer.preExecuteProcedures(this); 238 } 239 }); 240 } 241 242 public void postExecuteProcedures() throws IOException { 243 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 244 @Override 245 public void call(RegionServerObserver observer) throws IOException { 246 observer.postExecuteProcedures(this); 247 } 248 }); 249 } 250 251 public void preUpdateConfiguration(Configuration preReloadConf) throws IOException { 252 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 253 @Override 254 public void call(RegionServerObserver observer) throws IOException { 255 observer.preUpdateRegionServerConfiguration(this, preReloadConf); 256 } 257 }); 258 } 259 260 public void postUpdateConfiguration(Configuration postReloadConf) throws IOException { 261 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 262 @Override 263 public void call(RegionServerObserver observer) throws IOException { 264 observer.postUpdateRegionServerConfiguration(this, postReloadConf); 265 } 266 }); 267 } 268 269 public void preClearRegionBlockCache() throws IOException { 270 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 271 @Override 272 public void call(RegionServerObserver observer) throws IOException { 273 observer.preClearRegionBlockCache(this); 274 } 275 }); 276 } 277 278 public void postClearRegionBlockCache(CacheEvictionStats stats) throws IOException { 279 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 280 @Override 281 public void call(RegionServerObserver observer) throws IOException { 282 observer.postClearRegionBlockCache(this, stats); 283 } 284 }); 285 } 286 287 /** 288 * Coprocessor environment extension providing access to region server related services. 289 */ 290 private static class RegionServerEnvironment extends BaseEnvironment<RegionServerCoprocessor> 291 implements RegionServerCoprocessorEnvironment { 292 private final MetricRegistry metricRegistry; 293 private final RegionServerServices services; 294 295 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BC_UNCONFIRMED_CAST", 296 justification = "Intentional; FB has trouble detecting isAssignableFrom") 297 public RegionServerEnvironment(final RegionServerCoprocessor impl, final int priority, 298 final int seq, final Configuration conf, final RegionServerServices services) { 299 super(impl, priority, seq, conf); 300 // If coprocessor exposes any services, register them. 301 for (Service service : impl.getServices()) { 302 services.registerService(service); 303 } 304 this.services = services; 305 this.metricRegistry = 306 MetricsCoprocessor.createRegistryForRSCoprocessor(impl.getClass().getName()); 307 } 308 309 @Override 310 public OnlineRegions getOnlineRegions() { 311 return this.services; 312 } 313 314 @Override 315 public ServerName getServerName() { 316 return this.services.getServerName(); 317 } 318 319 @Override 320 public Connection getConnection() { 321 return new SharedConnection(this.services.getConnection()); 322 } 323 324 @Override 325 public Connection createConnection(Configuration conf) throws IOException { 326 return this.services.createConnection(conf); 327 } 328 329 @Override 330 public MetricRegistry getMetricRegistryForRegionServer() { 331 return metricRegistry; 332 } 333 334 @Override 335 public void shutdown() { 336 super.shutdown(); 337 MetricsCoprocessor.removeRegistry(metricRegistry); 338 } 339 } 340 341 /** 342 * Special version of RegionServerEnvironment that exposes RegionServerServices for Core 343 * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core. 344 */ 345 private static class RegionServerEnvironmentForCoreCoprocessors extends RegionServerEnvironment 346 implements HasRegionServerServices { 347 final RegionServerServices regionServerServices; 348 349 public RegionServerEnvironmentForCoreCoprocessors(final RegionServerCoprocessor impl, 350 final int priority, final int seq, final Configuration conf, 351 final RegionServerServices services) { 352 super(impl, priority, seq, conf, services); 353 this.regionServerServices = services; 354 } 355 356 /** 357 * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor 358 * consumption. 359 */ 360 @Override 361 public RegionServerServices getRegionServerServices() { 362 return this.regionServerServices; 363 } 364 } 365}