001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.lang.reflect.InvocationTargetException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025import java.util.UUID;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.stream.Collectors;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.RawCellBuilder;
036import org.apache.hadoop.hbase.RawCellBuilderFactory;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.client.Append;
039import org.apache.hadoop.hbase.client.CheckAndMutate;
040import org.apache.hadoop.hbase.client.CheckAndMutateResult;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.Delete;
043import org.apache.hadoop.hbase.client.Get;
044import org.apache.hadoop.hbase.client.Increment;
045import org.apache.hadoop.hbase.client.Mutation;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.SharedConnection;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
053import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
054import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
055import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
056import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
057import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
058import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
059import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
060import org.apache.hadoop.hbase.coprocessor.ObserverContext;
061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
062import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
063import org.apache.hadoop.hbase.coprocessor.RegionObserver;
064import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
065import org.apache.hadoop.hbase.io.Reference;
066import org.apache.hadoop.hbase.io.hfile.CacheConfig;
067import org.apache.hadoop.hbase.metrics.MetricRegistry;
068import org.apache.hadoop.hbase.quotas.OperationQuota;
069import org.apache.hadoop.hbase.quotas.RpcQuotaManager;
070import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
071import org.apache.hadoop.hbase.regionserver.Region.Operation;
072import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
073import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
074import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
075import org.apache.hadoop.hbase.security.User;
076import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
077import org.apache.hadoop.hbase.util.Pair;
078import org.apache.hadoop.hbase.wal.WALEdit;
079import org.apache.hadoop.hbase.wal.WALKey;
080import org.apache.yetus.audience.InterfaceAudience;
081import org.slf4j.Logger;
082import org.slf4j.LoggerFactory;
083
084import org.apache.hbase.thirdparty.com.google.protobuf.Message;
085import org.apache.hbase.thirdparty.com.google.protobuf.Service;
086import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap;
087import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap;
088
089import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
091
092/**
093 * Implements the coprocessor environment and runtime support for coprocessors loaded within a
094 * {@link Region}.
095 */
096@InterfaceAudience.Private
097public class RegionCoprocessorHost
098  extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> {
099
100  private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class);
101  // The shared data map
102  private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP =
103    new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD,
104      AbstractReferenceMap.ReferenceStrength.WEAK);
105
106  // optimization: no need to call postScannerFilterRow, if no coprocessor implements it
107  private final boolean hasCustomPostScannerFilterRow;
108
109  /*
110   * Whether any configured CPs override postScannerFilterRow hook
111   */
112  public boolean hasCustomPostScannerFilterRow() {
113    return hasCustomPostScannerFilterRow;
114  }
115
116  /**
117   * Encapsulation of the environment of each coprocessor
118   */
119  private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor>
120    implements RegionCoprocessorEnvironment {
121    private Region region;
122    ConcurrentMap<String, Object> sharedData;
123    private final MetricRegistry metricRegistry;
124    private final RegionServerServices services;
125    private final RpcQuotaManager rpcQuotaManager;
126
127    /**
128     * Constructor
129     * @param impl     the coprocessor instance
130     * @param priority chaining priority
131     */
132    public RegionEnvironment(final RegionCoprocessor impl, final int priority, final int seq,
133      final Configuration conf, final Region region, final RegionServerServices services,
134      final ConcurrentMap<String, Object> sharedData) {
135      super(impl, priority, seq, conf);
136      this.region = region;
137      this.sharedData = sharedData;
138      this.services = services;
139      this.metricRegistry =
140        MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName());
141      // Some unit tests reach this line with services == null, and are okay with rpcQuotaManager
142      // being null. Let these unit tests succeed. This should not happen in real usage.
143      if (services != null) {
144        this.rpcQuotaManager = services.getRegionServerRpcQuotaManager();
145      } else {
146        this.rpcQuotaManager = null;
147      }
148    }
149
150    /** Returns the region */
151    @Override
152    public Region getRegion() {
153      return region;
154    }
155
156    @Override
157    public OnlineRegions getOnlineRegions() {
158      return this.services;
159    }
160
161    @Override
162    public Connection getConnection() {
163      // Mocks may have services as null at test time.
164      return services != null ? new SharedConnection(services.getConnection()) : null;
165    }
166
167    @Override
168    public Connection createConnection(Configuration conf) throws IOException {
169      return services != null ? this.services.createConnection(conf) : null;
170    }
171
172    @Override
173    public ServerName getServerName() {
174      return services != null ? services.getServerName() : null;
175    }
176
177    @Override
178    public void shutdown() {
179      super.shutdown();
180      MetricsCoprocessor.removeRegistry(this.metricRegistry);
181    }
182
183    @Override
184    public ConcurrentMap<String, Object> getSharedData() {
185      return sharedData;
186    }
187
188    @Override
189    public RegionInfo getRegionInfo() {
190      return region.getRegionInfo();
191    }
192
193    @Override
194    public MetricRegistry getMetricRegistryForRegionServer() {
195      return metricRegistry;
196    }
197
198    @Override
199    public RawCellBuilder getCellBuilder() {
200      // We always do a DEEP_COPY only
201      return RawCellBuilderFactory.create();
202    }
203
204    @Override
205    public RpcQuotaManager getRpcQuotaManager() {
206      return rpcQuotaManager;
207    }
208
209    @Override
210    public OperationQuota checkScanQuota(Scan scan, long maxBlockBytesScanned,
211      long prevBlockBytesScannedDifference) throws IOException, RpcThrottlingException {
212      ClientProtos.ScanRequest scanRequest = RequestConverter
213        .buildScanRequest(region.getRegionInfo().getRegionName(), scan, scan.getCaching(), false);
214      long maxScannerResultSize =
215        services.getConfiguration().getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
216          HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
217      return rpcQuotaManager.checkScanQuota(region, scanRequest, maxScannerResultSize,
218        maxBlockBytesScanned, prevBlockBytesScannedDifference);
219    }
220
221    @Override
222    public OperationQuota checkBatchQuota(Region region, OperationQuota.OperationType type)
223      throws IOException, RpcThrottlingException {
224      return rpcQuotaManager.checkBatchQuota(region, type);
225    }
226
227    @Override
228    public OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads)
229      throws IOException, RpcThrottlingException {
230      return rpcQuotaManager.checkBatchQuota(region, numWrites, numReads);
231    }
232  }
233
234  /**
235   * Special version of RegionEnvironment that exposes RegionServerServices for Core Coprocessors
236   * only. Temporary hack until Core Coprocessors are integrated into Core.
237   */
238  private static class RegionEnvironmentForCoreCoprocessors extends RegionEnvironment
239    implements HasRegionServerServices {
240    private final RegionServerServices rsServices;
241
242    public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority,
243      final int seq, final Configuration conf, final Region region,
244      final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
245      super(impl, priority, seq, conf, region, services, sharedData);
246      this.rsServices = services;
247    }
248
249    /**
250     * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
251     *         consumption.
252     */
253    @Override
254    public RegionServerServices getRegionServerServices() {
255      return this.rsServices;
256    }
257  }
258
259  static class TableCoprocessorAttribute {
260    private Path path;
261    private String className;
262    private int priority;
263    private Configuration conf;
264
265    public TableCoprocessorAttribute(Path path, String className, int priority,
266      Configuration conf) {
267      this.path = path;
268      this.className = className;
269      this.priority = priority;
270      this.conf = conf;
271    }
272
273    public Path getPath() {
274      return path;
275    }
276
277    public String getClassName() {
278      return className;
279    }
280
281    public int getPriority() {
282      return priority;
283    }
284
285    public Configuration getConf() {
286      return conf;
287    }
288  }
289
290  /** The region server services */
291  RegionServerServices rsServices;
292  /** The region */
293  HRegion region;
294
295  /**
296   * Constructor
297   * @param region     the region
298   * @param rsServices interface to available region server functionality
299   * @param conf       the configuration
300   */
301  @SuppressWarnings("ReturnValueIgnored") // Checking method exists as CPU optimization
302  public RegionCoprocessorHost(final HRegion region, final RegionServerServices rsServices,
303    final Configuration conf) {
304    super(rsServices);
305    this.conf = conf;
306    this.rsServices = rsServices;
307    this.region = region;
308    this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
309
310    // load system default cp's from configuration.
311    loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
312
313    // load system default cp's for user tables from configuration.
314    if (!region.getRegionInfo().getTable().isSystemTable()) {
315      loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
316    }
317
318    // load Coprocessor From HDFS
319    loadTableCoprocessors(conf);
320
321    // now check whether any coprocessor implements postScannerFilterRow
322    boolean hasCustomPostScannerFilterRow = false;
323    out: for (RegionCoprocessorEnvironment env : coprocEnvironments) {
324      if (env.getInstance() instanceof RegionObserver) {
325        Class<?> clazz = env.getInstance().getClass();
326        for (;;) {
327          if (clazz == Object.class) {
328            // we dont need to look postScannerFilterRow into Object class
329            break; // break the inner loop
330          }
331          try {
332            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
333              InternalScanner.class, Cell.class, boolean.class);
334            // this coprocessor has a custom version of postScannerFilterRow
335            hasCustomPostScannerFilterRow = true;
336            break out;
337          } catch (NoSuchMethodException ignore) {
338          }
339          // the deprecated signature still exists
340          try {
341            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
342              InternalScanner.class, byte[].class, int.class, short.class, boolean.class);
343            // this coprocessor has a custom version of postScannerFilterRow
344            hasCustomPostScannerFilterRow = true;
345            break out;
346          } catch (NoSuchMethodException ignore) {
347          }
348          clazz = clazz.getSuperclass();
349        }
350      }
351    }
352    this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow;
353  }
354
355  static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
356    TableDescriptor htd) {
357    return htd.getCoprocessorDescriptors().stream().map(cp -> {
358      Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null);
359      Configuration ourConf;
360      if (!cp.getProperties().isEmpty()) {
361        // do an explicit deep copy of the passed configuration
362        ourConf = new Configuration(false);
363        HBaseConfiguration.merge(ourConf, conf);
364        cp.getProperties().forEach((k, v) -> ourConf.set(k, v));
365      } else {
366        ourConf = conf;
367      }
368      return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf);
369    }).collect(Collectors.toList());
370  }
371
372  /**
373   * Sanity check the table coprocessor attributes of the supplied schema. Will throw an exception
374   * if there is a problem.
375   */
376  public static void testTableCoprocessorAttrs(final Configuration conf, final TableDescriptor htd)
377    throws IOException {
378    String pathPrefix = UUID.randomUUID().toString();
379    for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf, htd)) {
380      if (attr.getPriority() < 0) {
381        throw new IOException(
382          "Priority for coprocessor " + attr.getClassName() + " cannot be less than 0");
383      }
384      ClassLoader old = Thread.currentThread().getContextClassLoader();
385      try {
386        ClassLoader cl;
387        if (attr.getPath() != null) {
388          cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
389            CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
390        } else {
391          cl = CoprocessorHost.class.getClassLoader();
392        }
393        Thread.currentThread().setContextClassLoader(cl);
394        if (cl instanceof CoprocessorClassLoader) {
395          String[] includedClassPrefixes = null;
396          if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) {
397            String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY);
398            includedClassPrefixes = prefixes.split(";");
399          }
400          ((CoprocessorClassLoader) cl).loadClass(attr.getClassName(), includedClassPrefixes);
401        } else {
402          cl.loadClass(attr.getClassName());
403        }
404      } catch (ClassNotFoundException e) {
405        throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
406      } finally {
407        Thread.currentThread().setContextClassLoader(old);
408      }
409    }
410  }
411
412  void loadTableCoprocessors(final Configuration conf) {
413    boolean coprocessorsEnabled =
414      conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED);
415    boolean tableCoprocessorsEnabled =
416      conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED);
417    if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
418      return;
419    }
420
421    // scan the table attributes for coprocessor load specifications
422    // initialize the coprocessors
423    List<RegionCoprocessorEnvironment> configured = new ArrayList<>();
424    for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf,
425      region.getTableDescriptor())) {
426      // Load encompasses classloading and coprocessor initialization
427      try {
428        RegionCoprocessorEnvironment env =
429          load(attr.getPath(), attr.getClassName(), attr.getPriority(), attr.getConf());
430        if (env == null) {
431          continue;
432        }
433        configured.add(env);
434        LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of "
435          + region.getTableDescriptor().getTableName().getNameAsString() + " successfully.");
436      } catch (Throwable t) {
437        // Coprocessor failed to load, do we abort on error?
438        if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
439          abortServer(attr.getClassName(), t);
440        } else {
441          LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
442        }
443      }
444    }
445    // add together to coprocessor set for COW efficiency
446    coprocEnvironments.addAll(configured);
447  }
448
449  @Override
450  public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq,
451    Configuration conf) {
452    // If coprocessor exposes any services, register them.
453    for (Service service : instance.getServices()) {
454      region.registerService(service);
455    }
456    ConcurrentMap<String, Object> classData;
457    // make sure only one thread can add maps
458    synchronized (SHARED_DATA_MAP) {
459      // as long as at least one RegionEnvironment holds on to its classData it will
460      // remain in this map
461      classData = SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(),
462        k -> new ConcurrentHashMap<>());
463    }
464    // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
465    return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)
466      ? new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region, rsServices,
467        classData)
468      : new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData);
469  }
470
471  @Override
472  public RegionCoprocessor checkAndGetInstance(Class<?> implClass)
473    throws InstantiationException, IllegalAccessException {
474    try {
475      if (RegionCoprocessor.class.isAssignableFrom(implClass)) {
476        return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance();
477      } else {
478        LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}",
479          implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
480        return null;
481      }
482    } catch (NoSuchMethodException | InvocationTargetException e) {
483      throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
484    }
485  }
486
487  private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter =
488    RegionCoprocessor::getRegionObserver;
489
490  private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter =
491    RegionCoprocessor::getEndpointObserver;
492
493  abstract class RegionObserverOperationWithoutResult
494    extends ObserverOperationWithoutResult<RegionObserver> {
495    public RegionObserverOperationWithoutResult() {
496      super(regionObserverGetter);
497    }
498
499    public RegionObserverOperationWithoutResult(User user) {
500      super(regionObserverGetter, user);
501    }
502
503    public RegionObserverOperationWithoutResult(boolean bypassable) {
504      super(regionObserverGetter, null, bypassable);
505    }
506
507    public RegionObserverOperationWithoutResult(User user, boolean bypassable) {
508      super(regionObserverGetter, user, bypassable);
509    }
510  }
511
512  abstract class BulkLoadObserverOperation
513    extends ObserverOperationWithoutResult<BulkLoadObserver> {
514    public BulkLoadObserverOperation(User user) {
515      super(RegionCoprocessor::getBulkLoadObserver, user);
516    }
517  }
518
519  //////////////////////////////////////////////////////////////////////////////////////////////////
520  // Observer operations
521  //////////////////////////////////////////////////////////////////////////////////////////////////
522
523  //////////////////////////////////////////////////////////////////////////////////////////////////
524  // Observer operations
525  //////////////////////////////////////////////////////////////////////////////////////////////////
526
527  /**
528   * Invoked before a region open.
529   * @throws IOException Signals that an I/O exception has occurred.
530   */
531  public void preOpen() throws IOException {
532    if (coprocEnvironments.isEmpty()) {
533      return;
534    }
535    execOperation(new RegionObserverOperationWithoutResult() {
536      @Override
537      public void call(RegionObserver observer) throws IOException {
538        observer.preOpen(this);
539      }
540    });
541  }
542
543  /**
544   * Invoked after a region open
545   */
546  public void postOpen() {
547    if (coprocEnvironments.isEmpty()) {
548      return;
549    }
550    try {
551      execOperation(new RegionObserverOperationWithoutResult() {
552        @Override
553        public void call(RegionObserver observer) throws IOException {
554          observer.postOpen(this);
555        }
556      });
557    } catch (IOException e) {
558      LOG.warn(e.toString(), e);
559    }
560  }
561
562  /**
563   * Invoked before a region is closed
564   * @param abortRequested true if the server is aborting
565   */
566  public void preClose(final boolean abortRequested) throws IOException {
567    execOperation(new RegionObserverOperationWithoutResult() {
568      @Override
569      public void call(RegionObserver observer) throws IOException {
570        observer.preClose(this, abortRequested);
571      }
572    });
573  }
574
575  /**
576   * Invoked after a region is closed
577   * @param abortRequested true if the server is aborting
578   */
579  public void postClose(final boolean abortRequested) {
580    try {
581      execOperation(new RegionObserverOperationWithoutResult() {
582        @Override
583        public void call(RegionObserver observer) throws IOException {
584          observer.postClose(this, abortRequested);
585        }
586
587        @Override
588        public void postEnvCall() {
589          shutdown(this.getEnvironment());
590        }
591      });
592    } catch (IOException e) {
593      LOG.warn(e.toString(), e);
594    }
595  }
596
597  /**
598   * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently
599   * available candidates.
600   * <p>
601   * Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed the
602   * passed in <code>candidates</code>.
603   * @param store      The store where compaction is being requested
604   * @param candidates The currently available store files
605   * @param tracker    used to track the life cycle of a compaction
606   * @param user       the user
607   */
608  public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates,
609    final CompactionLifeCycleTracker tracker, final User user) throws IOException {
610    if (coprocEnvironments.isEmpty()) {
611      return false;
612    }
613    boolean bypassable = true;
614    return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) {
615      @Override
616      public void call(RegionObserver observer) throws IOException {
617        observer.preCompactSelection(this, store, candidates, tracker);
618      }
619    });
620  }
621
622  /**
623   * Called after the {@link HStoreFile}s to be compacted have been selected from the available
624   * candidates.
625   * @param store    The store where compaction is being requested
626   * @param selected The store files selected to compact
627   * @param tracker  used to track the life cycle of a compaction
628   * @param request  the compaction request
629   * @param user     the user
630   */
631  public void postCompactSelection(final HStore store, final List<HStoreFile> selected,
632    final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
633    throws IOException {
634    if (coprocEnvironments.isEmpty()) {
635      return;
636    }
637    execOperation(new RegionObserverOperationWithoutResult(user) {
638      @Override
639      public void call(RegionObserver observer) throws IOException {
640        observer.postCompactSelection(this, store, selected, tracker, request);
641      }
642    });
643  }
644
645  /**
646   * Called prior to opening store scanner for compaction.
647   */
648  public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType,
649    CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException {
650    if (coprocEnvironments.isEmpty()) {
651      return store.getScanInfo();
652    }
653    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
654    execOperation(new RegionObserverOperationWithoutResult(user) {
655      @Override
656      public void call(RegionObserver observer) throws IOException {
657        observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request);
658      }
659    });
660    return builder.build();
661  }
662
663  /**
664   * Called prior to rewriting the store files selected for compaction
665   * @param store    the store being compacted
666   * @param scanner  the scanner used to read store data during compaction
667   * @param scanType type of Scan
668   * @param tracker  used to track the life cycle of a compaction
669   * @param request  the compaction request
670   * @param user     the user
671   * @return Scanner to use (cannot be null!)
672   */
673  public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
674    final ScanType scanType, final CompactionLifeCycleTracker tracker,
675    final CompactionRequest request, final User user) throws IOException {
676    InternalScanner defaultResult = scanner;
677    if (coprocEnvironments.isEmpty()) {
678      return defaultResult;
679    }
680    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
681      regionObserverGetter, defaultResult, user) {
682      @Override
683      public InternalScanner call(RegionObserver observer) throws IOException {
684        InternalScanner scanner =
685          observer.preCompact(this, store, getResult(), scanType, tracker, request);
686        if (scanner == null) {
687          throw new CoprocessorException("Null Scanner return disallowed!");
688        }
689        return scanner;
690      }
691    });
692  }
693
694  /**
695   * Called after the store compaction has completed.
696   * @param store      the store being compacted
697   * @param resultFile the new store file written during compaction
698   * @param tracker    used to track the life cycle of a compaction
699   * @param request    the compaction request
700   * @param user       the user
701   */
702  public void postCompact(final HStore store, final HStoreFile resultFile,
703    final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
704    throws IOException {
705    execOperation(
706      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(user) {
707        @Override
708        public void call(RegionObserver observer) throws IOException {
709          observer.postCompact(this, store, resultFile, tracker, request);
710        }
711      });
712  }
713
714  /**
715   * Invoked before create StoreScanner for flush.
716   */
717  public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker)
718    throws IOException {
719    if (coprocEnvironments.isEmpty()) {
720      return store.getScanInfo();
721    }
722    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
723    execOperation(new RegionObserverOperationWithoutResult() {
724      @Override
725      public void call(RegionObserver observer) throws IOException {
726        observer.preFlushScannerOpen(this, store, builder, tracker);
727      }
728    });
729    return builder.build();
730  }
731
732  /**
733   * Invoked before a memstore flush
734   * @return Scanner to use (cannot be null!)
735   */
736  public InternalScanner preFlush(HStore store, InternalScanner scanner,
737    FlushLifeCycleTracker tracker) throws IOException {
738    if (coprocEnvironments.isEmpty()) {
739      return scanner;
740    }
741    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
742      regionObserverGetter, scanner) {
743      @Override
744      public InternalScanner call(RegionObserver observer) throws IOException {
745        InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker);
746        if (scanner == null) {
747          throw new CoprocessorException("Null Scanner return disallowed!");
748        }
749        return scanner;
750      }
751    });
752  }
753
754  /**
755   * Invoked before a memstore flush
756   */
757  public void preFlush(FlushLifeCycleTracker tracker) throws IOException {
758    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
759      @Override
760      public void call(RegionObserver observer) throws IOException {
761        observer.preFlush(this, tracker);
762      }
763    });
764  }
765
766  /**
767   * Invoked after a memstore flush
768   */
769  public void postFlush(FlushLifeCycleTracker tracker) throws IOException {
770    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
771      @Override
772      public void call(RegionObserver observer) throws IOException {
773        observer.postFlush(this, tracker);
774      }
775    });
776  }
777
778  /**
779   * Invoked before in memory compaction.
780   */
781  public void preMemStoreCompaction(HStore store) throws IOException {
782    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
783      @Override
784      public void call(RegionObserver observer) throws IOException {
785        observer.preMemStoreCompaction(this, store);
786      }
787    });
788  }
789
790  /**
791   * Invoked before create StoreScanner for in memory compaction.
792   */
793  public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException {
794    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
795    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
796      @Override
797      public void call(RegionObserver observer) throws IOException {
798        observer.preMemStoreCompactionCompactScannerOpen(this, store, builder);
799      }
800    });
801    return builder.build();
802  }
803
804  /**
805   * Invoked before compacting memstore.
806   */
807  public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner)
808    throws IOException {
809    if (coprocEnvironments.isEmpty()) {
810      return scanner;
811    }
812    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
813      regionObserverGetter, scanner) {
814      @Override
815      public InternalScanner call(RegionObserver observer) throws IOException {
816        return observer.preMemStoreCompactionCompact(this, store, getResult());
817      }
818    });
819  }
820
821  /**
822   * Invoked after in memory compaction.
823   */
824  public void postMemStoreCompaction(HStore store) throws IOException {
825    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
826      @Override
827      public void call(RegionObserver observer) throws IOException {
828        observer.postMemStoreCompaction(this, store);
829      }
830    });
831  }
832
833  /**
834   * Invoked after a memstore flush
835   */
836  public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker)
837    throws IOException {
838    if (coprocEnvironments.isEmpty()) {
839      return;
840    }
841    execOperation(new RegionObserverOperationWithoutResult() {
842      @Override
843      public void call(RegionObserver observer) throws IOException {
844        observer.postFlush(this, store, storeFile, tracker);
845      }
846    });
847  }
848
849  // RegionObserver support
850  /**
851   * Supports Coprocessor 'bypass'.
852   * @param get     the Get request
853   * @param results What to return if return is true/'bypass'.
854   * @return true if default processing should be bypassed.
855   * @exception IOException Exception
856   */
857  public boolean preGet(final Get get, final List<Cell> results) throws IOException {
858    if (coprocEnvironments.isEmpty()) {
859      return false;
860    }
861    boolean bypassable = true;
862    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
863      @Override
864      public void call(RegionObserver observer) throws IOException {
865        observer.preGetOp(this, get, results);
866      }
867    });
868  }
869
870  /**
871   * @param get     the Get request
872   * @param results the result set
873   * @exception IOException Exception
874   */
875  public void postGet(final Get get, final List<Cell> results) throws IOException {
876    if (coprocEnvironments.isEmpty()) {
877      return;
878    }
879    execOperation(new RegionObserverOperationWithoutResult() {
880      @Override
881      public void call(RegionObserver observer) throws IOException {
882        observer.postGetOp(this, get, results);
883      }
884    });
885  }
886
887  /**
888   * Supports Coprocessor 'bypass'.
889   * @param get the Get request
890   * @return true or false to return to client if bypassing normal operation, or null otherwise
891   * @exception IOException Exception
892   */
893  public Boolean preExists(final Get get) throws IOException {
894    boolean bypassable = true;
895    boolean defaultResult = false;
896    if (coprocEnvironments.isEmpty()) {
897      return null;
898    }
899    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
900      regionObserverGetter, defaultResult, bypassable) {
901      @Override
902      public Boolean call(RegionObserver observer) throws IOException {
903        return observer.preExists(this, get, getResult());
904      }
905    });
906  }
907
908  /**
909   * @param get    the Get request
910   * @param result the result returned by the region server
911   * @return the result to return to the client
912   * @exception IOException Exception
913   */
914  public boolean postExists(final Get get, boolean result) throws IOException {
915    if (this.coprocEnvironments.isEmpty()) {
916      return result;
917    }
918    return execOperationWithResult(
919      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
920        @Override
921        public Boolean call(RegionObserver observer) throws IOException {
922          return observer.postExists(this, get, getResult());
923        }
924      });
925  }
926
927  /**
928   * Supports Coprocessor 'bypass'.
929   * @param put  The Put object
930   * @param edit The WALEdit object.
931   * @return true if default processing should be bypassed
932   * @exception IOException Exception
933   */
934  public boolean prePut(final Put put, final WALEdit edit) throws IOException {
935    if (coprocEnvironments.isEmpty()) {
936      return false;
937    }
938    boolean bypassable = true;
939    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
940      @Override
941      public void call(RegionObserver observer) throws IOException {
942        observer.prePut(this, put, edit);
943      }
944    });
945  }
946
947  /**
948   * Supports Coprocessor 'bypass'.
949   * @param mutation - the current mutation
950   * @param kv       - the current cell
951   * @param byteNow  - current timestamp in bytes
952   * @param get      - the get that could be used Note that the get only does not specify the family
953   *                 and qualifier that should be used
954   * @return true if default processing should be bypassed
955   */
956  public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, final Cell kv,
957    final byte[] byteNow, final Get get) throws IOException {
958    if (coprocEnvironments.isEmpty()) {
959      return false;
960    }
961    boolean bypassable = true;
962    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
963      @Override
964      public void call(RegionObserver observer) throws IOException {
965        observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get);
966      }
967    });
968  }
969
970  /**
971   * @param put  The Put object
972   * @param edit The WALEdit object.
973   * @exception IOException Exception
974   */
975  public void postPut(final Put put, final WALEdit edit) throws IOException {
976    if (coprocEnvironments.isEmpty()) {
977      return;
978    }
979    execOperation(new RegionObserverOperationWithoutResult() {
980      @Override
981      public void call(RegionObserver observer) throws IOException {
982        observer.postPut(this, put, edit);
983      }
984    });
985  }
986
987  /**
988   * Supports Coprocessor 'bypass'.
989   * @param delete The Delete object
990   * @param edit   The WALEdit object.
991   * @return true if default processing should be bypassed
992   * @exception IOException Exception
993   */
994  public boolean preDelete(final Delete delete, final WALEdit edit) throws IOException {
995    if (this.coprocEnvironments.isEmpty()) {
996      return false;
997    }
998    boolean bypassable = true;
999    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
1000      @Override
1001      public void call(RegionObserver observer) throws IOException {
1002        observer.preDelete(this, delete, edit);
1003      }
1004    });
1005  }
1006
1007  /**
1008   * @param delete The Delete object
1009   * @param edit   The WALEdit object.
1010   * @exception IOException Exception
1011   */
1012  public void postDelete(final Delete delete, final WALEdit edit) throws IOException {
1013    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1014      @Override
1015      public void call(RegionObserver observer) throws IOException {
1016        observer.postDelete(this, delete, edit);
1017      }
1018    });
1019  }
1020
1021  public void preBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp)
1022    throws IOException {
1023    if (this.coprocEnvironments.isEmpty()) {
1024      return;
1025    }
1026    execOperation(new RegionObserverOperationWithoutResult() {
1027      @Override
1028      public void call(RegionObserver observer) throws IOException {
1029        observer.preBatchMutate(this, miniBatchOp);
1030      }
1031    });
1032  }
1033
1034  public void postBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp)
1035    throws IOException {
1036    if (this.coprocEnvironments.isEmpty()) {
1037      return;
1038    }
1039    execOperation(new RegionObserverOperationWithoutResult() {
1040      @Override
1041      public void call(RegionObserver observer) throws IOException {
1042        observer.postBatchMutate(this, miniBatchOp);
1043      }
1044    });
1045  }
1046
1047  public void postBatchMutateIndispensably(final MiniBatchOperationInProgress<Mutation> miniBatchOp,
1048    final boolean success) throws IOException {
1049    if (this.coprocEnvironments.isEmpty()) {
1050      return;
1051    }
1052    execOperation(new RegionObserverOperationWithoutResult() {
1053      @Override
1054      public void call(RegionObserver observer) throws IOException {
1055        observer.postBatchMutateIndispensably(this, miniBatchOp, success);
1056      }
1057    });
1058  }
1059
1060  /**
1061   * Supports Coprocessor 'bypass'.
1062   * @param checkAndMutate the CheckAndMutate object
1063   * @return true or false to return to client if default processing should be bypassed, or null
1064   *         otherwise
1065   * @throws IOException if an error occurred on the coprocessor
1066   */
1067  public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate) throws IOException {
1068    boolean bypassable = true;
1069    CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
1070    if (coprocEnvironments.isEmpty()) {
1071      return null;
1072    }
1073    return execOperationWithResult(
1074      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter,
1075        defaultResult, bypassable) {
1076        @Override
1077        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1078          return observer.preCheckAndMutate(this, checkAndMutate, getResult());
1079        }
1080      });
1081  }
1082
1083  /**
1084   * Supports Coprocessor 'bypass'.
1085   * @param checkAndMutate the CheckAndMutate object
1086   * @return true or false to return to client if default processing should be bypassed, or null
1087   *         otherwise
1088   * @throws IOException if an error occurred on the coprocessor
1089   */
1090  public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate)
1091    throws IOException {
1092    boolean bypassable = true;
1093    CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
1094    if (coprocEnvironments.isEmpty()) {
1095      return null;
1096    }
1097    return execOperationWithResult(
1098      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter,
1099        defaultResult, bypassable) {
1100        @Override
1101        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1102          return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult());
1103        }
1104      });
1105  }
1106
1107  /**
1108   * @param checkAndMutate the CheckAndMutate object
1109   * @param result         the result returned by the checkAndMutate
1110   * @return true or false to return to client if default processing should be bypassed, or null
1111   *         otherwise
1112   * @throws IOException if an error occurred on the coprocessor
1113   */
1114  public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate,
1115    CheckAndMutateResult result) throws IOException {
1116    if (this.coprocEnvironments.isEmpty()) {
1117      return result;
1118    }
1119    return execOperationWithResult(
1120      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter,
1121        result) {
1122        @Override
1123        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1124          return observer.postCheckAndMutate(this, checkAndMutate, getResult());
1125        }
1126      });
1127  }
1128
1129  /**
1130   * Supports Coprocessor 'bypass'.
1131   * @param append append object
1132   * @param edit   The WALEdit object.
1133   * @return result to return to client if default operation should be bypassed, null otherwise
1134   * @throws IOException if an error occurred on the coprocessor
1135   */
1136  public Result preAppend(final Append append, final WALEdit edit) throws IOException {
1137    boolean bypassable = true;
1138    Result defaultResult = null;
1139    if (this.coprocEnvironments.isEmpty()) {
1140      return defaultResult;
1141    }
1142    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1143      regionObserverGetter, defaultResult, bypassable) {
1144      @Override
1145      public Result call(RegionObserver observer) throws IOException {
1146        return observer.preAppend(this, append, edit);
1147      }
1148    });
1149  }
1150
1151  /**
1152   * Supports Coprocessor 'bypass'.
1153   * @param append append object
1154   * @return result to return to client if default operation should be bypassed, null otherwise
1155   * @throws IOException if an error occurred on the coprocessor
1156   */
1157  public Result preAppendAfterRowLock(final Append append) throws IOException {
1158    boolean bypassable = true;
1159    Result defaultResult = null;
1160    if (this.coprocEnvironments.isEmpty()) {
1161      return defaultResult;
1162    }
1163    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1164      regionObserverGetter, defaultResult, bypassable) {
1165      @Override
1166      public Result call(RegionObserver observer) throws IOException {
1167        return observer.preAppendAfterRowLock(this, append);
1168      }
1169    });
1170  }
1171
1172  /**
1173   * Supports Coprocessor 'bypass'.
1174   * @param increment increment object
1175   * @param edit      The WALEdit object.
1176   * @return result to return to client if default operation should be bypassed, null otherwise
1177   * @throws IOException if an error occurred on the coprocessor
1178   */
1179  public Result preIncrement(final Increment increment, final WALEdit edit) throws IOException {
1180    boolean bypassable = true;
1181    Result defaultResult = null;
1182    if (coprocEnvironments.isEmpty()) {
1183      return defaultResult;
1184    }
1185    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1186      regionObserverGetter, defaultResult, bypassable) {
1187      @Override
1188      public Result call(RegionObserver observer) throws IOException {
1189        return observer.preIncrement(this, increment, edit);
1190      }
1191    });
1192  }
1193
1194  /**
1195   * Supports Coprocessor 'bypass'.
1196   * @param increment increment object
1197   * @return result to return to client if default operation should be bypassed, null otherwise
1198   * @throws IOException if an error occurred on the coprocessor
1199   */
1200  public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1201    boolean bypassable = true;
1202    Result defaultResult = null;
1203    if (coprocEnvironments.isEmpty()) {
1204      return defaultResult;
1205    }
1206    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1207      regionObserverGetter, defaultResult, bypassable) {
1208      @Override
1209      public Result call(RegionObserver observer) throws IOException {
1210        return observer.preIncrementAfterRowLock(this, increment);
1211      }
1212    });
1213  }
1214
1215  /**
1216   * @param append Append object
1217   * @param result the result returned by the append
1218   * @param edit   The WALEdit object.
1219   * @throws IOException if an error occurred on the coprocessor
1220   */
1221  public Result postAppend(final Append append, final Result result, final WALEdit edit)
1222    throws IOException {
1223    if (this.coprocEnvironments.isEmpty()) {
1224      return result;
1225    }
1226    return execOperationWithResult(
1227      new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1228        @Override
1229        public Result call(RegionObserver observer) throws IOException {
1230          return observer.postAppend(this, append, result, edit);
1231        }
1232      });
1233  }
1234
1235  /**
1236   * @param increment increment object
1237   * @param result    the result returned by postIncrement
1238   * @param edit      The WALEdit object.
1239   * @throws IOException if an error occurred on the coprocessor
1240   */
1241  public Result postIncrement(final Increment increment, Result result, final WALEdit edit)
1242    throws IOException {
1243    if (this.coprocEnvironments.isEmpty()) {
1244      return result;
1245    }
1246    return execOperationWithResult(
1247      new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1248        @Override
1249        public Result call(RegionObserver observer) throws IOException {
1250          return observer.postIncrement(this, increment, getResult(), edit);
1251        }
1252      });
1253  }
1254
1255  /**
1256   * @param scan the Scan specification
1257   * @exception IOException Exception
1258   */
1259  public void preScannerOpen(final Scan scan) throws IOException {
1260    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1261      @Override
1262      public void call(RegionObserver observer) throws IOException {
1263        observer.preScannerOpen(this, scan);
1264      }
1265    });
1266  }
1267
1268  /**
1269   * @param scan the Scan specification
1270   * @param s    the scanner
1271   * @return the scanner instance to use
1272   * @exception IOException Exception
1273   */
1274  public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1275    if (this.coprocEnvironments.isEmpty()) {
1276      return s;
1277    }
1278    return execOperationWithResult(
1279      new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) {
1280        @Override
1281        public RegionScanner call(RegionObserver observer) throws IOException {
1282          return observer.postScannerOpen(this, scan, getResult());
1283        }
1284      });
1285  }
1286
1287  /**
1288   * @param s       the scanner
1289   * @param results the result set returned by the region server
1290   * @param limit   the maximum number of results to return
1291   * @return 'has next' indication to client if bypassing default behavior, or null otherwise
1292   * @exception IOException Exception
1293   */
1294  public Boolean preScannerNext(final InternalScanner s, final List<Result> results,
1295    final int limit) throws IOException {
1296    boolean bypassable = true;
1297    boolean defaultResult = false;
1298    if (coprocEnvironments.isEmpty()) {
1299      return null;
1300    }
1301    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
1302      regionObserverGetter, defaultResult, bypassable) {
1303      @Override
1304      public Boolean call(RegionObserver observer) throws IOException {
1305        return observer.preScannerNext(this, s, results, limit, getResult());
1306      }
1307    });
1308  }
1309
1310  /**
1311   * @param s       the scanner
1312   * @param results the result set returned by the region server
1313   * @param limit   the maximum number of results to return
1314   * @return 'has more' indication to give to client
1315   * @exception IOException Exception
1316   */
1317  public boolean postScannerNext(final InternalScanner s, final List<Result> results,
1318    final int limit, boolean hasMore) throws IOException {
1319    if (this.coprocEnvironments.isEmpty()) {
1320      return hasMore;
1321    }
1322    return execOperationWithResult(
1323      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) {
1324        @Override
1325        public Boolean call(RegionObserver observer) throws IOException {
1326          return observer.postScannerNext(this, s, results, limit, getResult());
1327        }
1328      });
1329  }
1330
1331  /**
1332   * This will be called by the scan flow when the current scanned row is being filtered out by the
1333   * filter.
1334   * @param s          the scanner
1335   * @param curRowCell The cell in the current row which got filtered out
1336   * @return whether more rows are available for the scanner or not
1337   */
1338  public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell)
1339    throws IOException {
1340    // short circuit for performance
1341    boolean defaultResult = true;
1342    if (!hasCustomPostScannerFilterRow) {
1343      return defaultResult;
1344    }
1345    if (this.coprocEnvironments.isEmpty()) {
1346      return defaultResult;
1347    }
1348    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
1349      regionObserverGetter, defaultResult) {
1350      @Override
1351      public Boolean call(RegionObserver observer) throws IOException {
1352        return observer.postScannerFilterRow(this, s, curRowCell, getResult());
1353      }
1354    });
1355  }
1356
1357  /**
1358   * Supports Coprocessor 'bypass'.
1359   * @param s the scanner
1360   * @return true if default behavior should be bypassed, false otherwise
1361   * @exception IOException Exception
1362   */
1363  // Should this be bypassable?
1364  public boolean preScannerClose(final InternalScanner s) throws IOException {
1365    return execOperation(
1366      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) {
1367        @Override
1368        public void call(RegionObserver observer) throws IOException {
1369          observer.preScannerClose(this, s);
1370        }
1371      });
1372  }
1373
1374  /**
1375   * @exception IOException Exception
1376   */
1377  public void postScannerClose(final InternalScanner s) throws IOException {
1378    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1379      @Override
1380      public void call(RegionObserver observer) throws IOException {
1381        observer.postScannerClose(this, s);
1382      }
1383    });
1384  }
1385
1386  /**
1387   * Called before open store scanner for user scan.
1388   */
1389  public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException {
1390    if (coprocEnvironments.isEmpty()) return store.getScanInfo();
1391    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan);
1392    execOperation(new RegionObserverOperationWithoutResult() {
1393      @Override
1394      public void call(RegionObserver observer) throws IOException {
1395        observer.preStoreScannerOpen(this, store, builder);
1396      }
1397    });
1398    return builder.build();
1399  }
1400
1401  /**
1402   * @param info  the RegionInfo for this region
1403   * @param edits the file of recovered edits
1404   */
1405  public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1406    execOperation(
1407      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) {
1408        @Override
1409        public void call(RegionObserver observer) throws IOException {
1410          observer.preReplayWALs(this, info, edits);
1411        }
1412      });
1413  }
1414
1415  /**
1416   * @param info  the RegionInfo for this region
1417   * @param edits the file of recovered edits
1418   * @throws IOException Exception
1419   */
1420  public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1421    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1422      @Override
1423      public void call(RegionObserver observer) throws IOException {
1424        observer.postReplayWALs(this, info, edits);
1425      }
1426    });
1427  }
1428
1429  /**
1430   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1431   */
1432  public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1433    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1434      @Override
1435      public void call(RegionObserver observer) throws IOException {
1436        observer.preBulkLoadHFile(this, familyPaths);
1437      }
1438    });
1439  }
1440
1441  public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
1442    throws IOException {
1443    return execOperation(
1444      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1445        @Override
1446        public void call(RegionObserver observer) throws IOException {
1447          observer.preCommitStoreFile(this, family, pairs);
1448        }
1449      });
1450  }
1451
1452  public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath)
1453    throws IOException {
1454    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1455      @Override
1456      public void call(RegionObserver observer) throws IOException {
1457        observer.postCommitStoreFile(this, family, srcPath, dstPath);
1458      }
1459    });
1460  }
1461
1462  /**
1463   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1464   * @param map         Map of CF to List of file paths for the final loaded files
1465   */
1466  public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1467    Map<byte[], List<Path>> map) throws IOException {
1468    if (this.coprocEnvironments.isEmpty()) {
1469      return;
1470    }
1471    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1472      @Override
1473      public void call(RegionObserver observer) throws IOException {
1474        observer.postBulkLoadHFile(this, familyPaths, map);
1475      }
1476    });
1477  }
1478
1479  public void postStartRegionOperation(final Operation op) throws IOException {
1480    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1481      @Override
1482      public void call(RegionObserver observer) throws IOException {
1483        observer.postStartRegionOperation(this, op);
1484      }
1485    });
1486  }
1487
1488  public void postCloseRegionOperation(final Operation op) throws IOException {
1489    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1490      @Override
1491      public void call(RegionObserver observer) throws IOException {
1492        observer.postCloseRegionOperation(this, op);
1493      }
1494    });
1495  }
1496
1497  /**
1498   * @param fs   fileystem to read from
1499   * @param p    path to the file
1500   * @param in   {@link FSDataInputStreamWrapper}
1501   * @param size Full size of the file
1502   * @param r    original reference file. This will be not null only when reading a split file.
1503   * @return a Reader instance to use instead of the base reader if overriding default behavior,
1504   *         null otherwise
1505   */
1506  public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1507    final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1508    final Reference r) throws IOException {
1509    if (coprocEnvironments.isEmpty()) {
1510      return null;
1511    }
1512    return execOperationWithResult(
1513      new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) {
1514        @Override
1515        public StoreFileReader call(RegionObserver observer) throws IOException {
1516          return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult());
1517        }
1518      });
1519  }
1520
1521  /**
1522   * @param fs     fileystem to read from
1523   * @param p      path to the file
1524   * @param in     {@link FSDataInputStreamWrapper}
1525   * @param size   Full size of the file
1526   * @param r      original reference file. This will be not null only when reading a split file.
1527   * @param reader the base reader instance
1528   * @return The reader to use
1529   */
1530  public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1531    final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1532    final Reference r, final StoreFileReader reader) throws IOException {
1533    if (this.coprocEnvironments.isEmpty()) {
1534      return reader;
1535    }
1536    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, StoreFileReader>(
1537      regionObserverGetter, reader) {
1538      @Override
1539      public StoreFileReader call(RegionObserver observer) throws IOException {
1540        return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult());
1541      }
1542    });
1543  }
1544
1545  public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation,
1546    final List<Pair<Cell, Cell>> cellPairs) throws IOException {
1547    if (this.coprocEnvironments.isEmpty()) {
1548      return cellPairs;
1549    }
1550    return execOperationWithResult(
1551      new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter,
1552        cellPairs) {
1553        @Override
1554        public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
1555          return observer.postIncrementBeforeWAL(this, mutation, getResult());
1556        }
1557      });
1558  }
1559
1560  public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation,
1561    final List<Pair<Cell, Cell>> cellPairs) throws IOException {
1562    if (this.coprocEnvironments.isEmpty()) {
1563      return cellPairs;
1564    }
1565    return execOperationWithResult(
1566      new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter,
1567        cellPairs) {
1568        @Override
1569        public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
1570          return observer.postAppendBeforeWAL(this, mutation, getResult());
1571        }
1572      });
1573  }
1574
1575  public void preWALAppend(WALKey key, WALEdit edit) throws IOException {
1576    if (this.coprocEnvironments.isEmpty()) {
1577      return;
1578    }
1579    execOperation(new RegionObserverOperationWithoutResult() {
1580      @Override
1581      public void call(RegionObserver observer) throws IOException {
1582        observer.preWALAppend(this, key, edit);
1583      }
1584    });
1585  }
1586
1587  public Message preEndpointInvocation(final Service service, final String methodName,
1588    Message request) throws IOException {
1589    if (coprocEnvironments.isEmpty()) {
1590      return request;
1591    }
1592    return execOperationWithResult(
1593      new ObserverOperationWithResult<EndpointObserver, Message>(endpointObserverGetter, request) {
1594        @Override
1595        public Message call(EndpointObserver observer) throws IOException {
1596          return observer.preEndpointInvocation(this, service, methodName, getResult());
1597        }
1598      });
1599  }
1600
1601  public void postEndpointInvocation(final Service service, final String methodName,
1602    final Message request, final Message.Builder responseBuilder) throws IOException {
1603    execOperation(coprocEnvironments.isEmpty()
1604      ? null
1605      : new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) {
1606        @Override
1607        public void call(EndpointObserver observer) throws IOException {
1608          observer.postEndpointInvocation(this, service, methodName, request, responseBuilder);
1609        }
1610      });
1611  }
1612
1613  public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException {
1614    if (this.coprocEnvironments.isEmpty()) {
1615      return result;
1616    }
1617    return execOperationWithResult(
1618      new ObserverOperationWithResult<RegionObserver, DeleteTracker>(regionObserverGetter, result) {
1619        @Override
1620        public DeleteTracker call(RegionObserver observer) throws IOException {
1621          return observer.postInstantiateDeleteTracker(this, getResult());
1622        }
1623      });
1624  }
1625
1626  /////////////////////////////////////////////////////////////////////////////////////////////////
1627  // BulkLoadObserver hooks
1628  /////////////////////////////////////////////////////////////////////////////////////////////////
1629  public void prePrepareBulkLoad(User user) throws IOException {
1630    execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) {
1631      @Override
1632      protected void call(BulkLoadObserver observer) throws IOException {
1633        observer.prePrepareBulkLoad(this);
1634      }
1635    });
1636  }
1637
1638  public void preCleanupBulkLoad(User user) throws IOException {
1639    execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) {
1640      @Override
1641      protected void call(BulkLoadObserver observer) throws IOException {
1642        observer.preCleanupBulkLoad(this);
1643      }
1644    });
1645  }
1646}