001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.protobuf.Service;
021import java.io.IOException;
022import java.lang.reflect.InvocationTargetException;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.CacheEvictionStats;
025import org.apache.hadoop.hbase.ServerName;
026import org.apache.hadoop.hbase.SharedConnection;
027import org.apache.hadoop.hbase.client.Connection;
028import org.apache.hadoop.hbase.client.Mutation;
029import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
030import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
031import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
032import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
033import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
034import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
035import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
036import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
037import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
038import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService;
039import org.apache.hadoop.hbase.metrics.MetricRegistry;
040import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
041import org.apache.hadoop.hbase.security.User;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
047
048@InterfaceAudience.Private
049public class RegionServerCoprocessorHost
050  extends CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> {
051
052  private static final Logger LOG = LoggerFactory.getLogger(RegionServerCoprocessorHost.class);
053
054  private RegionServerServices rsServices;
055
056  public RegionServerCoprocessorHost(RegionServerServices rsServices, Configuration conf) {
057    super(rsServices);
058    this.rsServices = rsServices;
059    this.conf = conf;
060    // Log the state of coprocessor loading here; should appear only once or
061    // twice in the daemon log, depending on HBase version, because there is
062    // only one RegionServerCoprocessorHost instance in the RS process
063    boolean coprocessorsEnabled =
064      conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED);
065    boolean tableCoprocessorsEnabled =
066      conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED);
067    LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled"));
068    LOG.info("Table coprocessor loading is "
069      + ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled"));
070    loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
071  }
072
073  @Override
074  public RegionServerEnvironment createEnvironment(RegionServerCoprocessor instance, int priority,
075    int sequence, Configuration conf) {
076    // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
077    return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)
078      ? new RegionServerEnvironmentForCoreCoprocessors(instance, priority, sequence, conf,
079        this.rsServices)
080      : new RegionServerEnvironment(instance, priority, sequence, conf, this.rsServices);
081  }
082
083  @Override
084  public RegionServerCoprocessor checkAndGetInstance(Class<?> implClass)
085    throws InstantiationException, IllegalAccessException {
086    try {
087      if (RegionServerCoprocessor.class.isAssignableFrom(implClass)) {
088        return implClass.asSubclass(RegionServerCoprocessor.class).getDeclaredConstructor()
089          .newInstance();
090      } else if (SingletonCoprocessorService.class.isAssignableFrom(implClass)) {
091        // For backward compatibility with old CoprocessorService impl which don't extend
092        // RegionCoprocessor.
093        SingletonCoprocessorService tmp = implClass.asSubclass(SingletonCoprocessorService.class)
094          .getDeclaredConstructor().newInstance();
095        return new CoprocessorServiceBackwardCompatiblity.RegionServerCoprocessorService(tmp);
096      } else {
097        LOG.error("{} is not of type RegionServerCoprocessor. Check the configuration of {}",
098          implClass.getName(), CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY);
099        return null;
100      }
101    } catch (NoSuchMethodException | InvocationTargetException e) {
102      throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
103    }
104  }
105
106  private ObserverGetter<RegionServerCoprocessor, RegionServerObserver> rsObserverGetter =
107    RegionServerCoprocessor::getRegionServerObserver;
108
109  abstract class RegionServerObserverOperation
110    extends ObserverOperationWithoutResult<RegionServerObserver> {
111    public RegionServerObserverOperation() {
112      super(rsObserverGetter);
113    }
114
115    public RegionServerObserverOperation(User user) {
116      super(rsObserverGetter, user);
117    }
118  }
119
120  //////////////////////////////////////////////////////////////////////////////////////////////////
121  // RegionServerObserver operations
122  //////////////////////////////////////////////////////////////////////////////////////////////////
123
124  public void preStop(String message, User user) throws IOException {
125    // While stopping the region server all coprocessors method should be executed first then the
126    // coprocessor should be cleaned up.
127    if (coprocEnvironments.isEmpty()) {
128      return;
129    }
130    execShutdown(new RegionServerObserverOperation(user) {
131      @Override
132      public void call(RegionServerObserver observer) throws IOException {
133        observer.preStopRegionServer(this);
134      }
135
136      @Override
137      public void postEnvCall() {
138        // invoke coprocessor stop method
139        shutdown(this.getEnvironment());
140      }
141    });
142  }
143
144  public void preRollWALWriterRequest() throws IOException {
145    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
146      @Override
147      public void call(RegionServerObserver observer) throws IOException {
148        observer.preRollWALWriterRequest(this);
149      }
150    });
151  }
152
153  public void postRollWALWriterRequest() throws IOException {
154    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
155      @Override
156      public void call(RegionServerObserver observer) throws IOException {
157        observer.postRollWALWriterRequest(this);
158      }
159    });
160  }
161
162  public void preReplicateLogEntries() throws IOException {
163    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
164      @Override
165      public void call(RegionServerObserver observer) throws IOException {
166        observer.preReplicateLogEntries(this);
167      }
168    });
169  }
170
171  public void postReplicateLogEntries() throws IOException {
172    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
173      @Override
174      public void call(RegionServerObserver observer) throws IOException {
175        observer.postReplicateLogEntries(this);
176      }
177    });
178  }
179
180  public void preReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation)
181    throws IOException {
182    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
183      @Override
184      public void call(RegionServerObserver observer) throws IOException {
185        observer.preReplicationSinkBatchMutate(this, walEntry, mutation);
186      }
187    });
188  }
189
190  public void postReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation)
191    throws IOException {
192    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
193      @Override
194      public void call(RegionServerObserver observer) throws IOException {
195        observer.postReplicationSinkBatchMutate(this, walEntry, mutation);
196      }
197    });
198  }
199
200  public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
201    throws IOException {
202    if (this.coprocEnvironments.isEmpty()) {
203      return endpoint;
204    }
205    return execOperationWithResult(
206      new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(rsObserverGetter,
207        endpoint) {
208        @Override
209        public ReplicationEndpoint call(RegionServerObserver observer) throws IOException {
210          return observer.postCreateReplicationEndPoint(this, getResult());
211        }
212      });
213  }
214
215  public void preClearCompactionQueues() throws IOException {
216    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
217      @Override
218      public void call(RegionServerObserver observer) throws IOException {
219        observer.preClearCompactionQueues(this);
220      }
221    });
222  }
223
224  public void postClearCompactionQueues() throws IOException {
225    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
226      @Override
227      public void call(RegionServerObserver observer) throws IOException {
228        observer.postClearCompactionQueues(this);
229      }
230    });
231  }
232
233  public void preExecuteProcedures() throws IOException {
234    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
235      @Override
236      public void call(RegionServerObserver observer) throws IOException {
237        observer.preExecuteProcedures(this);
238      }
239    });
240  }
241
242  public void postExecuteProcedures() throws IOException {
243    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
244      @Override
245      public void call(RegionServerObserver observer) throws IOException {
246        observer.postExecuteProcedures(this);
247      }
248    });
249  }
250
251  public void preUpdateConfiguration(Configuration preReloadConf) throws IOException {
252    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
253      @Override
254      public void call(RegionServerObserver observer) throws IOException {
255        observer.preUpdateRegionServerConfiguration(this, preReloadConf);
256      }
257    });
258  }
259
260  public void postUpdateConfiguration(Configuration postReloadConf) throws IOException {
261    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
262      @Override
263      public void call(RegionServerObserver observer) throws IOException {
264        observer.postUpdateRegionServerConfiguration(this, postReloadConf);
265      }
266    });
267  }
268
269  public void preClearRegionBlockCache() throws IOException {
270    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
271      @Override
272      public void call(RegionServerObserver observer) throws IOException {
273        observer.preClearRegionBlockCache(this);
274      }
275    });
276  }
277
278  public void postClearRegionBlockCache(CacheEvictionStats stats) throws IOException {
279    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
280      @Override
281      public void call(RegionServerObserver observer) throws IOException {
282        observer.postClearRegionBlockCache(this, stats);
283      }
284    });
285  }
286
287  /**
288   * Coprocessor environment extension providing access to region server related services.
289   */
290  private static class RegionServerEnvironment extends BaseEnvironment<RegionServerCoprocessor>
291    implements RegionServerCoprocessorEnvironment {
292    private final MetricRegistry metricRegistry;
293    private final RegionServerServices services;
294
295    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BC_UNCONFIRMED_CAST",
296        justification = "Intentional; FB has trouble detecting isAssignableFrom")
297    public RegionServerEnvironment(final RegionServerCoprocessor impl, final int priority,
298      final int seq, final Configuration conf, final RegionServerServices services) {
299      super(impl, priority, seq, conf);
300      // If coprocessor exposes any services, register them.
301      for (Service service : impl.getServices()) {
302        services.registerService(service);
303      }
304      this.services = services;
305      this.metricRegistry =
306        MetricsCoprocessor.createRegistryForRSCoprocessor(impl.getClass().getName());
307    }
308
309    @Override
310    public OnlineRegions getOnlineRegions() {
311      return this.services;
312    }
313
314    @Override
315    public ServerName getServerName() {
316      return this.services.getServerName();
317    }
318
319    @Override
320    public Connection getConnection() {
321      return new SharedConnection(this.services.getConnection());
322    }
323
324    @Override
325    public Connection createConnection(Configuration conf) throws IOException {
326      return this.services.createConnection(conf);
327    }
328
329    @Override
330    public MetricRegistry getMetricRegistryForRegionServer() {
331      return metricRegistry;
332    }
333
334    @Override
335    public void shutdown() {
336      super.shutdown();
337      MetricsCoprocessor.removeRegistry(metricRegistry);
338    }
339  }
340
341  /**
342   * Special version of RegionServerEnvironment that exposes RegionServerServices for Core
343   * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core.
344   */
345  private static class RegionServerEnvironmentForCoreCoprocessors extends RegionServerEnvironment
346    implements HasRegionServerServices {
347    final RegionServerServices regionServerServices;
348
349    public RegionServerEnvironmentForCoreCoprocessors(final RegionServerCoprocessor impl,
350      final int priority, final int seq, final Configuration conf,
351      final RegionServerServices services) {
352      super(impl, priority, seq, conf, services);
353      this.regionServerServices = services;
354    }
355
356    /**
357     * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
358     *         consumption.
359     */
360    @Override
361    public RegionServerServices getRegionServerServices() {
362      return this.regionServerServices;
363    }
364  }
365}