001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.access;
019
020import com.google.protobuf.Message;
021import com.google.protobuf.RpcCallback;
022import com.google.protobuf.RpcController;
023import com.google.protobuf.Service;
024import java.io.IOException;
025import java.security.PrivilegedExceptionAction;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Map;
033import java.util.Map.Entry;
034import java.util.Optional;
035import java.util.Set;
036import java.util.TreeMap;
037import java.util.TreeSet;
038import java.util.stream.Collectors;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.ArrayBackedTag;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.CellScanner;
043import org.apache.hadoop.hbase.CellUtil;
044import org.apache.hadoop.hbase.CompareOperator;
045import org.apache.hadoop.hbase.CompoundConfiguration;
046import org.apache.hadoop.hbase.CoprocessorEnvironment;
047import org.apache.hadoop.hbase.DoNotRetryIOException;
048import org.apache.hadoop.hbase.HBaseInterfaceAudience;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.KeyValue;
051import org.apache.hadoop.hbase.KeyValue.Type;
052import org.apache.hadoop.hbase.NamespaceDescriptor;
053import org.apache.hadoop.hbase.PrivateCellUtil;
054import org.apache.hadoop.hbase.ServerName;
055import org.apache.hadoop.hbase.TableName;
056import org.apache.hadoop.hbase.Tag;
057import org.apache.hadoop.hbase.client.Admin;
058import org.apache.hadoop.hbase.client.Append;
059import org.apache.hadoop.hbase.client.BalanceRequest;
060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
062import org.apache.hadoop.hbase.client.Delete;
063import org.apache.hadoop.hbase.client.Durability;
064import org.apache.hadoop.hbase.client.Get;
065import org.apache.hadoop.hbase.client.Increment;
066import org.apache.hadoop.hbase.client.MasterSwitchType;
067import org.apache.hadoop.hbase.client.Mutation;
068import org.apache.hadoop.hbase.client.Put;
069import org.apache.hadoop.hbase.client.Query;
070import org.apache.hadoop.hbase.client.RegionInfo;
071import org.apache.hadoop.hbase.client.Result;
072import org.apache.hadoop.hbase.client.Scan;
073import org.apache.hadoop.hbase.client.SnapshotDescription;
074import org.apache.hadoop.hbase.client.Table;
075import org.apache.hadoop.hbase.client.TableDescriptor;
076import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
077import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
078import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
079import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
080import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
081import org.apache.hadoop.hbase.coprocessor.HasMasterServices;
082import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
083import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
084import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
085import org.apache.hadoop.hbase.coprocessor.MasterObserver;
086import org.apache.hadoop.hbase.coprocessor.ObserverContext;
087import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
088import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
089import org.apache.hadoop.hbase.coprocessor.RegionObserver;
090import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
091import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
092import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
093import org.apache.hadoop.hbase.filter.ByteArrayComparable;
094import org.apache.hadoop.hbase.filter.Filter;
095import org.apache.hadoop.hbase.filter.FilterList;
096import org.apache.hadoop.hbase.io.hfile.HFile;
097import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
098import org.apache.hadoop.hbase.ipc.RpcServer;
099import org.apache.hadoop.hbase.master.MasterServices;
100import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
101import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
102import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
103import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.HasPermissionRequest;
104import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.HasPermissionResponse;
105import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings;
106import org.apache.hadoop.hbase.regionserver.BloomType;
107import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
108import org.apache.hadoop.hbase.regionserver.InternalScanner;
109import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
110import org.apache.hadoop.hbase.regionserver.Region;
111import org.apache.hadoop.hbase.regionserver.RegionScanner;
112import org.apache.hadoop.hbase.regionserver.RegionServerServices;
113import org.apache.hadoop.hbase.regionserver.ScanType;
114import org.apache.hadoop.hbase.regionserver.ScannerContext;
115import org.apache.hadoop.hbase.regionserver.Store;
116import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
117import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
118import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
119import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
120import org.apache.hadoop.hbase.security.AccessDeniedException;
121import org.apache.hadoop.hbase.security.Superusers;
122import org.apache.hadoop.hbase.security.User;
123import org.apache.hadoop.hbase.security.UserProvider;
124import org.apache.hadoop.hbase.security.access.Permission.Action;
125import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
126import org.apache.hadoop.hbase.util.ByteRange;
127import org.apache.hadoop.hbase.util.Bytes;
128import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
129import org.apache.hadoop.hbase.util.Pair;
130import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
131import org.apache.hadoop.hbase.wal.WALEdit;
132import org.apache.yetus.audience.InterfaceAudience;
133import org.slf4j.Logger;
134import org.slf4j.LoggerFactory;
135
136import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
137import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
138import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
139import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker;
140import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
141
142import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
143
144/**
145 * Provides basic authorization checks for data access and administrative operations.
146 * <p>
147 * {@code AccessController} performs authorization checks for HBase operations based on:
148 * </p>
149 * <ul>
150 * <li>the identity of the user performing the operation</li>
151 * <li>the scope over which the operation is performed, in increasing specificity: global, table,
152 * column family, or qualifier</li>
153 * <li>the type of action being performed (as mapped to {@link Permission.Action} values)</li>
154 * </ul>
155 * <p>
156 * If the authorization check fails, an {@link AccessDeniedException} will be thrown for the
157 * operation.
158 * </p>
159 * <p>
160 * To perform authorization checks, {@code AccessController} relies on the RpcServerEngine being
161 * loaded to provide the user identities for remote requests.
162 * </p>
163 * <p>
164 * The access control lists used for authorization can be manipulated via the exposed
165 * {@link AccessControlService} Interface implementation, and the associated {@code grant},
166 * {@code revoke}, and {@code user_permission} HBase shell commands.
167 * </p>
168 */
169@CoreCoprocessor
170@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
171public class AccessController implements MasterCoprocessor, RegionCoprocessor,
172  RegionServerCoprocessor, AccessControlService.Interface, MasterObserver, RegionObserver,
173  RegionServerObserver, EndpointObserver, BulkLoadObserver {
174  // TODO: encapsulate observer functions into separate class/sub-class.
175
176  private static final Logger LOG = LoggerFactory.getLogger(AccessController.class);
177
178  private static final Logger AUDITLOG =
179    LoggerFactory.getLogger("SecurityLogger." + AccessController.class.getName());
180  private static final String CHECK_COVERING_PERM = "check_covering_perm";
181  private static final String TAG_CHECK_PASSED = "tag_check_passed";
182  private static final byte[] TRUE = Bytes.toBytes(true);
183
184  private AccessChecker accessChecker;
185  private ZKPermissionWatcher zkPermissionWatcher;
186
187  /** flags if we are running on a region of the _acl_ table */
188  private boolean aclRegion = false;
189
190  /**
191   * defined only for Endpoint implementation, so it can have way to access region services
192   */
193  private RegionCoprocessorEnvironment regionEnv;
194
195  /** Mapping of scanner instances to the user who created them */
196  private Map<InternalScanner, String> scannerOwners = new MapMaker().weakKeys().makeMap();
197
198  private Map<TableName, List<UserPermission>> tableAcls;
199
200  /** Provider for mapping principal names to Users */
201  private UserProvider userProvider;
202
203  /**
204   * if we are active, usually false, only true if "hbase.security.authorization" has been set to
205   * true in site configuration
206   */
207  private boolean authorizationEnabled;
208
209  /** if we are able to support cell ACLs */
210  private boolean cellFeaturesEnabled;
211
212  /** if we should check EXEC permissions */
213  private boolean shouldCheckExecPermission;
214
215  /**
216   * if we should terminate access checks early as soon as table or CF grants allow access; pre-0.98
217   * compatible behavior
218   */
219  private boolean compatibleEarlyTermination;
220
221  /** if we have been successfully initialized */
222  private volatile boolean initialized = false;
223
224  /** if the ACL table is available, only relevant in the master */
225  private volatile boolean aclTabAvailable = false;
226
227  public static boolean isCellAuthorizationSupported(Configuration conf) {
228    return AccessChecker.isAuthorizationSupported(conf)
229      && (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
230  }
231
232  public Region getRegion() {
233    return regionEnv != null ? regionEnv.getRegion() : null;
234  }
235
236  public AuthManager getAuthManager() {
237    return accessChecker.getAuthManager();
238  }
239
240  private void initialize(RegionCoprocessorEnvironment e) throws IOException {
241    final Region region = e.getRegion();
242    Configuration conf = e.getConfiguration();
243    Map<byte[], ListMultimap<String, UserPermission>> tables = PermissionStorage.loadAll(region);
244    // For each table, write out the table's permissions to the respective
245    // znode for that table.
246    for (Map.Entry<byte[], ListMultimap<String, UserPermission>> t : tables.entrySet()) {
247      byte[] entry = t.getKey();
248      ListMultimap<String, UserPermission> perms = t.getValue();
249      byte[] serialized = PermissionStorage.writePermissionsAsBytes(perms, conf);
250      zkPermissionWatcher.writeToZookeeper(entry, serialized);
251    }
252    initialized = true;
253  }
254
255  /**
256   * Writes all table ACLs for the tables in the given Map up into ZooKeeper znodes. This is called
257   * to synchronize ACL changes following {@code _acl_} table updates.
258   */
259  private void updateACL(RegionCoprocessorEnvironment e, final Map<byte[], List<Cell>> familyMap) {
260    Set<byte[]> entries = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
261    for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
262      List<Cell> cells = f.getValue();
263      for (Cell cell : cells) {
264        if (CellUtil.matchingFamily(cell, PermissionStorage.ACL_LIST_FAMILY)) {
265          entries.add(CellUtil.cloneRow(cell));
266        }
267      }
268    }
269    Configuration conf = regionEnv.getConfiguration();
270    byte[] currentEntry = null;
271    // TODO: Here we are already on the ACL region. (And it is single
272    // region) We can even just get the region from the env and do get
273    // directly. The short circuit connection would avoid the RPC overhead
274    // so no socket communication, req write/read .. But we have the PB
275    // to and fro conversion overhead. get req is converted to PB req
276    // and results are converted to PB results 1st and then to POJOs
277    // again. We could have avoided such at least in ACL table context..
278    try (Table t = e.getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
279      for (byte[] entry : entries) {
280        currentEntry = entry;
281        ListMultimap<String, UserPermission> perms =
282          PermissionStorage.getPermissions(conf, entry, t, null, null, null, false);
283        byte[] serialized = PermissionStorage.writePermissionsAsBytes(perms, conf);
284        zkPermissionWatcher.writeToZookeeper(entry, serialized);
285      }
286    } catch (IOException ex) {
287      LOG.error("Failed updating permissions mirror for '"
288        + (currentEntry == null ? "null" : Bytes.toString(currentEntry)) + "'", ex);
289    }
290  }
291
292  /**
293   * Check the current user for authorization to perform a specific action against the given set of
294   * row data.
295   * @param opType   the operation type
296   * @param user     the user
297   * @param e        the coprocessor environment
298   * @param families the map of column families to qualifiers present in the request
299   * @param actions  the desired actions
300   * @return an authorization result
301   */
302  private AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
303    Map<byte[], ? extends Collection<?>> families, Action... actions) {
304    AuthResult result = null;
305    for (Action action : actions) {
306      result = accessChecker.permissionGranted(opType.toString(), user, action,
307        e.getRegion().getRegionInfo().getTable(), families);
308      if (!result.isAllowed()) {
309        return result;
310      }
311    }
312    return result;
313  }
314
315  public void requireAccess(ObserverContext<?> ctx, String request, TableName tableName,
316    Action... permissions) throws IOException {
317    accessChecker.requireAccess(getActiveUser(ctx), request, tableName, permissions);
318  }
319
320  public void requirePermission(ObserverContext<?> ctx, String request, Action perm)
321    throws IOException {
322    accessChecker.requirePermission(getActiveUser(ctx), request, null, perm);
323  }
324
325  public void requireGlobalPermission(ObserverContext<?> ctx, String request, Action perm,
326    TableName tableName, Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
327    accessChecker.requireGlobalPermission(getActiveUser(ctx), request, perm, tableName, familyMap,
328      null);
329  }
330
331  public void requireGlobalPermission(ObserverContext<?> ctx, String request, Action perm,
332    String namespace) throws IOException {
333    accessChecker.requireGlobalPermission(getActiveUser(ctx), request, perm, namespace);
334  }
335
336  public void requireNamespacePermission(ObserverContext<?> ctx, String request, String namespace,
337    Action... permissions) throws IOException {
338    accessChecker.requireNamespacePermission(getActiveUser(ctx), request, namespace, null,
339      permissions);
340  }
341
342  public void requireNamespacePermission(ObserverContext<?> ctx, String request, String namespace,
343    TableName tableName, Map<byte[], ? extends Collection<byte[]>> familyMap, Action... permissions)
344    throws IOException {
345    accessChecker.requireNamespacePermission(getActiveUser(ctx), request, namespace, tableName,
346      familyMap, permissions);
347  }
348
349  public void requirePermission(ObserverContext<?> ctx, String request, TableName tableName,
350    byte[] family, byte[] qualifier, Action... permissions) throws IOException {
351    accessChecker.requirePermission(getActiveUser(ctx), request, tableName, family, qualifier, null,
352      permissions);
353  }
354
355  public void requireTablePermission(ObserverContext<?> ctx, String request, TableName tableName,
356    byte[] family, byte[] qualifier, Action... permissions) throws IOException {
357    accessChecker.requireTablePermission(getActiveUser(ctx), request, tableName, family, qualifier,
358      permissions);
359  }
360
361  public void checkLockPermissions(ObserverContext<?> ctx, String namespace, TableName tableName,
362    RegionInfo[] regionInfos, String reason) throws IOException {
363    accessChecker.checkLockPermissions(getActiveUser(ctx), namespace, tableName, regionInfos,
364      reason);
365  }
366
367  /**
368   * Returns <code>true</code> if the current user is allowed the given action over at least one of
369   * the column qualifiers in the given column families.
370   */
371  private boolean hasFamilyQualifierPermission(User user, Action perm,
372    RegionCoprocessorEnvironment env, Map<byte[], ? extends Collection<byte[]>> familyMap)
373    throws IOException {
374    RegionInfo hri = env.getRegion().getRegionInfo();
375    TableName tableName = hri.getTable();
376
377    if (user == null) {
378      return false;
379    }
380
381    if (familyMap != null && familyMap.size() > 0) {
382      // at least one family must be allowed
383      for (Map.Entry<byte[], ? extends Collection<byte[]>> family : familyMap.entrySet()) {
384        if (family.getValue() != null && !family.getValue().isEmpty()) {
385          for (byte[] qualifier : family.getValue()) {
386            if (
387              getAuthManager().authorizeUserTable(user, tableName, family.getKey(), qualifier, perm)
388            ) {
389              return true;
390            }
391          }
392        } else {
393          if (getAuthManager().authorizeUserFamily(user, tableName, family.getKey(), perm)) {
394            return true;
395          }
396        }
397      }
398    } else if (LOG.isDebugEnabled()) {
399      LOG.debug("Empty family map passed for permission check");
400    }
401
402    return false;
403  }
404
405  private enum OpType {
406    GET("get"),
407    EXISTS("exists"),
408    SCAN("scan"),
409    PUT("put"),
410    DELETE("delete"),
411    CHECK_AND_PUT("checkAndPut"),
412    CHECK_AND_DELETE("checkAndDelete"),
413    APPEND("append"),
414    INCREMENT("increment");
415
416    private String type;
417
418    private OpType(String type) {
419      this.type = type;
420    }
421
422    @Override
423    public String toString() {
424      return type;
425    }
426  }
427
428  /**
429   * Determine if cell ACLs covered by the operation grant access. This is expensive.
430   * @return false if cell ACLs failed to grant access, true otherwise
431   */
432  private boolean checkCoveringPermission(User user, OpType request, RegionCoprocessorEnvironment e,
433    byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
434    throws IOException {
435    if (!cellFeaturesEnabled) {
436      return false;
437    }
438    long cellGrants = 0;
439    long latestCellTs = 0;
440    Get get = new Get(row);
441    // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
442    // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
443    // version. We have to get every cell version and check its TS against the TS asked for in
444    // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
445    // consider only one such passing cell. In case of Delete we have to consider all the cell
446    // versions under this passing version. When Delete Mutation contains columns which are a
447    // version delete just consider only one version for those column cells.
448    boolean considerCellTs = (request == OpType.PUT || request == OpType.DELETE);
449    if (considerCellTs) {
450      get.setMaxVersions();
451    } else {
452      get.setMaxVersions(1);
453    }
454    boolean diffCellTsFromOpTs = false;
455    for (Map.Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
456      byte[] col = entry.getKey();
457      // TODO: HBASE-7114 could possibly unify the collection type in family
458      // maps so we would not need to do this
459      if (entry.getValue() instanceof Set) {
460        Set<byte[]> set = (Set<byte[]>) entry.getValue();
461        if (set == null || set.isEmpty()) {
462          get.addFamily(col);
463        } else {
464          for (byte[] qual : set) {
465            get.addColumn(col, qual);
466          }
467        }
468      } else if (entry.getValue() instanceof List) {
469        List<Cell> list = (List<Cell>) entry.getValue();
470        if (list == null || list.isEmpty()) {
471          get.addFamily(col);
472        } else {
473          // In case of family delete, a Cell will be added into the list with Qualifier as null.
474          for (Cell cell : list) {
475            if (
476              cell.getQualifierLength() == 0 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
477                || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())
478            ) {
479              get.addFamily(col);
480            } else {
481              get.addColumn(col, CellUtil.cloneQualifier(cell));
482            }
483            if (considerCellTs) {
484              long cellTs = cell.getTimestamp();
485              latestCellTs = Math.max(latestCellTs, cellTs);
486              diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
487            }
488          }
489        }
490      } else if (entry.getValue() == null) {
491        get.addFamily(col);
492      } else {
493        throw new RuntimeException(
494          "Unhandled collection type " + entry.getValue().getClass().getName());
495      }
496    }
497    // We want to avoid looking into the future. So, if the cells of the
498    // operation specify a timestamp, or the operation itself specifies a
499    // timestamp, then we use the maximum ts found. Otherwise, we bound
500    // the Get to the current server time. We add 1 to the timerange since
501    // the upper bound of a timerange is exclusive yet we need to examine
502    // any cells found there inclusively.
503    long latestTs = Math.max(opTs, latestCellTs);
504    if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
505      latestTs = EnvironmentEdgeManager.currentTime();
506    }
507    get.setTimeRange(0, latestTs + 1);
508    // In case of Put operation we set to read all versions. This was done to consider the case
509    // where columns are added with TS other than the Mutation TS. But normally this wont be the
510    // case with Put. There no need to get all versions but get latest version only.
511    if (!diffCellTsFromOpTs && request == OpType.PUT) {
512      get.setMaxVersions(1);
513    }
514    if (LOG.isTraceEnabled()) {
515      LOG.trace("Scanning for cells with " + get);
516    }
517    // This Map is identical to familyMap. The key is a BR rather than byte[].
518    // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
519    // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
520    Map<ByteRange, List<Cell>> familyMap1 = new HashMap<>();
521    for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
522      if (entry.getValue() instanceof List) {
523        familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
524      }
525    }
526    RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
527    List<Cell> cells = Lists.newArrayList();
528    Cell prevCell = null;
529    ByteRange curFam = new SimpleMutableByteRange();
530    boolean curColAllVersions = (request == OpType.DELETE);
531    long curColCheckTs = opTs;
532    boolean foundColumn = false;
533    try {
534      boolean more = false;
535      ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(1).build();
536
537      do {
538        cells.clear();
539        // scan with limit as 1 to hold down memory use on wide rows
540        more = scanner.next(cells, scannerContext);
541        for (Cell cell : cells) {
542          if (LOG.isTraceEnabled()) {
543            LOG.trace("Found cell " + cell);
544          }
545          boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
546          if (colChange) foundColumn = false;
547          prevCell = cell;
548          if (!curColAllVersions && foundColumn) {
549            continue;
550          }
551          if (colChange && considerCellTs) {
552            curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
553            List<Cell> cols = familyMap1.get(curFam);
554            for (Cell col : cols) {
555              // null/empty qualifier is used to denote a Family delete. The TS and delete type
556              // associated with this is applicable for all columns within the family. That is
557              // why the below (col.getQualifierLength() == 0) check.
558              if (
559                (col.getQualifierLength() == 0 && request == OpType.DELETE)
560                  || CellUtil.matchingQualifier(cell, col)
561              ) {
562                byte type = col.getTypeByte();
563                if (considerCellTs) {
564                  curColCheckTs = col.getTimestamp();
565                }
566                // For a Delete op we pass allVersions as true. When a Delete Mutation contains
567                // a version delete for a column no need to check all the covering cells within
568                // that column. Check all versions when Type is DeleteColumn or DeleteFamily
569                // One version delete types are Delete/DeleteFamilyVersion
570                curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
571                  || (KeyValue.Type.DeleteFamily.getCode() == type);
572                break;
573              }
574            }
575          }
576          if (cell.getTimestamp() > curColCheckTs) {
577            // Just ignore this cell. This is not a covering cell.
578            continue;
579          }
580          foundColumn = true;
581          for (Action action : actions) {
582            // Are there permissions for this user for the cell?
583            if (!getAuthManager().authorizeCell(user, getTableName(e), cell, action)) {
584              // We can stop if the cell ACL denies access
585              return false;
586            }
587          }
588          cellGrants++;
589        }
590      } while (more);
591    } catch (AccessDeniedException ex) {
592      throw ex;
593    } catch (IOException ex) {
594      LOG.error("Exception while getting cells to calculate covering permission", ex);
595    } finally {
596      scanner.close();
597    }
598    // We should not authorize unless we have found one or more cell ACLs that
599    // grant access. This code is used to check for additional permissions
600    // after no table or CF grants are found.
601    return cellGrants > 0;
602  }
603
604  private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
605    // Iterate over the entries in the familyMap, replacing the cells therein
606    // with new cells including the ACL data
607    for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
608      List<Cell> newCells = Lists.newArrayList();
609      for (Cell cell : e.getValue()) {
610        // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
611        List<Tag> tags = new ArrayList<>();
612        tags.add(new ArrayBackedTag(PermissionStorage.ACL_TAG_TYPE, perms));
613        Iterator<Tag> tagIterator = PrivateCellUtil.tagsIterator(cell);
614        while (tagIterator.hasNext()) {
615          tags.add(tagIterator.next());
616        }
617        newCells.add(PrivateCellUtil.createCell(cell, tags));
618      }
619      // This is supposed to be safe, won't CME
620      e.setValue(newCells);
621    }
622  }
623
624  // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
625  // type is reserved and should not be explicitly set by user.
626  private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
627    // No need to check if we're not going to throw
628    if (!authorizationEnabled) {
629      m.setAttribute(TAG_CHECK_PASSED, TRUE);
630      return;
631    }
632    // Superusers are allowed to store cells unconditionally.
633    if (Superusers.isSuperUser(user)) {
634      m.setAttribute(TAG_CHECK_PASSED, TRUE);
635      return;
636    }
637    // We already checked (prePut vs preBatchMutation)
638    if (m.getAttribute(TAG_CHECK_PASSED) != null) {
639      return;
640    }
641    for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
642      Iterator<Tag> tagsItr = PrivateCellUtil.tagsIterator(cellScanner.current());
643      while (tagsItr.hasNext()) {
644        if (tagsItr.next().getType() == PermissionStorage.ACL_TAG_TYPE) {
645          throw new AccessDeniedException("Mutation contains cell with reserved type tag");
646        }
647      }
648    }
649    m.setAttribute(TAG_CHECK_PASSED, TRUE);
650  }
651
652  /* ---- MasterObserver implementation ---- */
653  @Override
654  public void start(CoprocessorEnvironment env) throws IOException {
655    CompoundConfiguration conf = new CompoundConfiguration();
656    conf.add(env.getConfiguration());
657
658    authorizationEnabled = AccessChecker.isAuthorizationSupported(conf);
659    if (!authorizationEnabled) {
660      LOG.warn("AccessController has been loaded with authorization checks DISABLED!");
661    }
662
663    shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
664      AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
665
666    cellFeaturesEnabled = (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
667    if (!cellFeaturesEnabled) {
668      LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
669        + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
670        + " accordingly.");
671    }
672
673    if (env instanceof MasterCoprocessorEnvironment) {
674      // if running on HMaster
675      MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
676      if (mEnv instanceof HasMasterServices) {
677        MasterServices masterServices = ((HasMasterServices) mEnv).getMasterServices();
678        zkPermissionWatcher = masterServices.getZKPermissionWatcher();
679        accessChecker = masterServices.getAccessChecker();
680      }
681    } else if (env instanceof RegionServerCoprocessorEnvironment) {
682      RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
683      if (rsEnv instanceof HasRegionServerServices) {
684        RegionServerServices rsServices =
685          ((HasRegionServerServices) rsEnv).getRegionServerServices();
686        zkPermissionWatcher = rsServices.getZKPermissionWatcher();
687        accessChecker = rsServices.getAccessChecker();
688      }
689    } else if (env instanceof RegionCoprocessorEnvironment) {
690      // if running at region
691      regionEnv = (RegionCoprocessorEnvironment) env;
692      conf.addBytesMap(regionEnv.getRegion().getTableDescriptor().getValues());
693      compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
694        AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
695      if (regionEnv instanceof HasRegionServerServices) {
696        RegionServerServices rsServices =
697          ((HasRegionServerServices) regionEnv).getRegionServerServices();
698        zkPermissionWatcher = rsServices.getZKPermissionWatcher();
699        accessChecker = rsServices.getAccessChecker();
700      }
701    }
702
703    if (zkPermissionWatcher == null) {
704      throw new NullPointerException("ZKPermissionWatcher is null");
705    } else if (accessChecker == null) {
706      throw new NullPointerException("AccessChecker is null");
707    }
708    // set the user-provider.
709    this.userProvider = UserProvider.instantiate(env.getConfiguration());
710    tableAcls = new MapMaker().weakValues().makeMap();
711  }
712
713  @Override
714  public void stop(CoprocessorEnvironment env) {
715  }
716
717  /*********************************** Observer/Service Getters ***********************************/
718  @Override
719  public Optional<RegionObserver> getRegionObserver() {
720    return Optional.of(this);
721  }
722
723  @Override
724  public Optional<MasterObserver> getMasterObserver() {
725    return Optional.of(this);
726  }
727
728  @Override
729  public Optional<EndpointObserver> getEndpointObserver() {
730    return Optional.of(this);
731  }
732
733  @Override
734  public Optional<BulkLoadObserver> getBulkLoadObserver() {
735    return Optional.of(this);
736  }
737
738  @Override
739  public Optional<RegionServerObserver> getRegionServerObserver() {
740    return Optional.of(this);
741  }
742
743  @Override
744  public Iterable<Service> getServices() {
745    return Collections
746      .singleton(AccessControlProtos.AccessControlService.newReflectiveService(this));
747  }
748
749  /*********************************** Observer implementations ***********************************/
750
751  @Override
752  public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c, TableDescriptor desc,
753    RegionInfo[] regions) throws IOException {
754    Set<byte[]> families = desc.getColumnFamilyNames();
755    Map<byte[], Set<byte[]>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
756    for (byte[] family : families) {
757      familyMap.put(family, null);
758    }
759    requireNamespacePermission(c, "createTable", desc.getTableName().getNamespaceAsString(),
760      desc.getTableName(), familyMap, Action.ADMIN, Action.CREATE);
761  }
762
763  @Override
764  public void postCompletedCreateTableAction(final ObserverContext<MasterCoprocessorEnvironment> c,
765    final TableDescriptor desc, final RegionInfo[] regions) throws IOException {
766    // When AC is used, it should be configured as the 1st CP.
767    // In Master, the table operations like create, are handled by a Thread pool but the max size
768    // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
769    // sequentially only.
770    // Related code in HMaster#startServiceThreads
771    // {code}
772    // // We depend on there being only one instance of this executor running
773    // // at a time. To do concurrency, would need fencing of enable/disable of
774    // // tables.
775    // this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
776    // {code}
777    // In future if we change this pool to have more threads, then there is a chance for thread,
778    // creating acl table, getting delayed and by that time another table creation got over and
779    // this hook is getting called. In such a case, we will need a wait logic here which will
780    // wait till the acl table is created.
781    if (PermissionStorage.isAclTable(desc)) {
782      this.aclTabAvailable = true;
783    } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
784      if (!aclTabAvailable) {
785        LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
786          + PermissionStorage.ACL_TABLE_NAME + " is not yet created. " + getClass().getSimpleName()
787          + " should be configured as the first Coprocessor");
788      } else {
789        String owner = desc.getOwnerString();
790        // default the table owner to current user, if not specified.
791        if (owner == null) owner = getActiveUser(c).getShortName();
792        final UserPermission userPermission = new UserPermission(owner,
793          Permission.newBuilder(desc.getTableName()).withActions(Action.values()).build());
794        // switch to the real hbase master user for doing the RPC on the ACL table
795        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
796          @Override
797          public Void run() throws Exception {
798            try (Table table =
799              c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
800              PermissionStorage.addUserPermission(c.getEnvironment().getConfiguration(),
801                userPermission, table);
802            }
803            return null;
804          }
805        });
806      }
807    }
808  }
809
810  @Override
811  public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
812    throws IOException {
813    requirePermission(c, "deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
814  }
815
816  @Override
817  public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
818    final TableName tableName) throws IOException {
819    final Configuration conf = c.getEnvironment().getConfiguration();
820    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
821      @Override
822      public Void run() throws Exception {
823        try (Table table =
824          c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
825          PermissionStorage.removeTablePermissions(conf, tableName, table);
826        }
827        return null;
828      }
829    });
830    zkPermissionWatcher.deleteTableACLNode(tableName);
831  }
832
833  @Override
834  public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
835    final TableName tableName) throws IOException {
836    requirePermission(c, "truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
837
838    final Configuration conf = c.getEnvironment().getConfiguration();
839    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
840      @Override
841      public Void run() throws Exception {
842        List<UserPermission> acls =
843          PermissionStorage.getUserTablePermissions(conf, tableName, null, null, null, false);
844        if (acls != null) {
845          tableAcls.put(tableName, acls);
846        }
847        return null;
848      }
849    });
850  }
851
852  @Override
853  public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
854    final TableName tableName) throws IOException {
855    final Configuration conf = ctx.getEnvironment().getConfiguration();
856    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
857      @Override
858      public Void run() throws Exception {
859        List<UserPermission> perms = tableAcls.get(tableName);
860        if (perms != null) {
861          for (UserPermission perm : perms) {
862            try (Table table =
863              ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
864              PermissionStorage.addUserPermission(conf, perm, table);
865            }
866          }
867        }
868        tableAcls.remove(tableName);
869        return null;
870      }
871    });
872  }
873
874  @Override
875  public TableDescriptor preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
876    TableName tableName, TableDescriptor currentDesc, TableDescriptor newDesc) throws IOException {
877    // TODO: potentially check if this is a add/modify/delete column operation
878    requirePermission(c, "modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
879    return newDesc;
880  }
881
882  @Override
883  public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
884    final TableDescriptor htd) throws IOException {
885    final Configuration conf = c.getEnvironment().getConfiguration();
886    // default the table owner to current user, if not specified.
887    final String owner =
888      (htd.getOwnerString() != null) ? htd.getOwnerString() : getActiveUser(c).getShortName();
889    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
890      @Override
891      public Void run() throws Exception {
892        UserPermission userperm = new UserPermission(owner,
893          Permission.newBuilder(htd.getTableName()).withActions(Action.values()).build());
894        try (Table table =
895          c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
896          PermissionStorage.addUserPermission(conf, userperm, table);
897        }
898        return null;
899      }
900    });
901  }
902
903  public String preModifyTableStoreFileTracker(ObserverContext<MasterCoprocessorEnvironment> c,
904    TableName tableName, String dstSFT) throws IOException {
905    requirePermission(c, "modifyTableStoreFileTracker", tableName, null, null, Action.ADMIN,
906      Action.CREATE);
907    return dstSFT;
908  }
909
910  @Override
911  public String preModifyColumnFamilyStoreFileTracker(
912    ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName, byte[] family,
913    String dstSFT) throws IOException {
914    requirePermission(c, "modifyColumnFamilyStoreFileTracker", tableName, family, null,
915      Action.ADMIN, Action.CREATE);
916    return dstSFT;
917  }
918
919  @Override
920  public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
921    throws IOException {
922    requirePermission(c, "enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
923  }
924
925  @Override
926  public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
927    throws IOException {
928    if (Bytes.equals(tableName.getName(), PermissionStorage.ACL_GLOBAL_NAME)) {
929      // We have to unconditionally disallow disable of the ACL table when we are installed,
930      // even if not enforcing authorizations. We are still allowing grants and revocations,
931      // checking permissions and logging audit messages, etc. If the ACL table is not
932      // available we will fail random actions all over the place.
933      throw new AccessDeniedException("Not allowed to disable " + PermissionStorage.ACL_TABLE_NAME
934        + " table with AccessController installed");
935    }
936    requirePermission(c, "disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
937  }
938
939  @Override
940  public void preAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx,
941    final long procId) throws IOException {
942    requirePermission(ctx, "abortProcedure", Action.ADMIN);
943  }
944
945  @Override
946  public void postAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx)
947    throws IOException {
948    // There is nothing to do at this time after the procedure abort request was sent.
949  }
950
951  @Override
952  public void preGetProcedures(ObserverContext<MasterCoprocessorEnvironment> ctx)
953    throws IOException {
954    requirePermission(ctx, "getProcedure", Action.ADMIN);
955  }
956
957  @Override
958  public void preGetLocks(ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
959    User user = getActiveUser(ctx);
960    accessChecker.requirePermission(user, "getLocks", null, Action.ADMIN);
961  }
962
963  @Override
964  public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo region,
965    ServerName srcServer, ServerName destServer) throws IOException {
966    requirePermission(c, "move", region.getTable(), null, null, Action.ADMIN);
967  }
968
969  @Override
970  public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo regionInfo)
971    throws IOException {
972    requirePermission(c, "assign", regionInfo.getTable(), null, null, Action.ADMIN);
973  }
974
975  @Override
976  public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo regionInfo)
977    throws IOException {
978    requirePermission(c, "unassign", regionInfo.getTable(), null, null, Action.ADMIN);
979  }
980
981  @Override
982  public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
983    RegionInfo regionInfo) throws IOException {
984    requirePermission(c, "regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
985  }
986
987  @Override
988  public void preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
989    final boolean newValue, final MasterSwitchType switchType) throws IOException {
990    requirePermission(ctx, "setSplitOrMergeEnabled", Action.ADMIN);
991  }
992
993  @Override
994  public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c, BalanceRequest request)
995    throws IOException {
996    requirePermission(c, "balance", Action.ADMIN);
997  }
998
999  @Override
1000  public void preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c, boolean newValue)
1001    throws IOException {
1002    requirePermission(c, "balanceSwitch", Action.ADMIN);
1003  }
1004
1005  @Override
1006  public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c) throws IOException {
1007    requirePermission(c, "shutdown", Action.ADMIN);
1008  }
1009
1010  @Override
1011  public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c) throws IOException {
1012    requirePermission(c, "stopMaster", Action.ADMIN);
1013  }
1014
1015  @Override
1016  public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1017    throws IOException {
1018    try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) {
1019      if (!admin.tableExists(PermissionStorage.ACL_TABLE_NAME)) {
1020        createACLTable(admin);
1021      } else {
1022        this.aclTabAvailable = true;
1023      }
1024    }
1025  }
1026
1027  /**
1028   * Create the ACL table
1029   */
1030  private static void createACLTable(Admin admin) throws IOException {
1031    /** Table descriptor for ACL table */
1032    ColumnFamilyDescriptor cfd =
1033      ColumnFamilyDescriptorBuilder.newBuilder(PermissionStorage.ACL_LIST_FAMILY).setMaxVersions(1)
1034        .setInMemory(true).setBlockCacheEnabled(true).setBlocksize(8 * 1024)
1035        .setBloomFilterType(BloomType.NONE).setScope(HConstants.REPLICATION_SCOPE_LOCAL).build();
1036    TableDescriptor td = TableDescriptorBuilder.newBuilder(PermissionStorage.ACL_TABLE_NAME)
1037      .setColumnFamily(cfd).build();
1038    admin.createTable(td);
1039  }
1040
1041  @Override
1042  public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1043    final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor) throws IOException {
1044    // Move this ACL check to SnapshotManager#checkPermissions as part of AC deprecation.
1045    requirePermission(ctx, "snapshot " + snapshot.getName(), hTableDescriptor.getTableName(), null,
1046      null, Permission.Action.ADMIN);
1047  }
1048
1049  @Override
1050  public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1051    final SnapshotDescription snapshot) throws IOException {
1052    User user = getActiveUser(ctx);
1053    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1054      // list it, if user is the owner of snapshot
1055      AuthResult result = AuthResult.allow("listSnapshot " + snapshot.getName(),
1056        "Snapshot owner check allowed", user, null, null, null);
1057      AccessChecker.logResult(result);
1058    } else {
1059      accessChecker.requirePermission(user, "listSnapshot " + snapshot.getName(), null,
1060        Action.ADMIN);
1061    }
1062  }
1063
1064  @Override
1065  public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1066    final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor) throws IOException {
1067    User user = getActiveUser(ctx);
1068    if (
1069      SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)
1070        && hTableDescriptor.getTableName().getNameAsString().equals(snapshot.getTable())
1071    ) {
1072      // Snapshot owner is allowed to create a table with the same name as the snapshot he took
1073      AuthResult result = AuthResult.allow("cloneSnapshot " + snapshot.getName(),
1074        "Snapshot owner check allowed", user, null, hTableDescriptor.getTableName(), null);
1075      AccessChecker.logResult(result);
1076    } else if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1077      requireNamespacePermission(ctx, "cloneSnapshot",
1078        hTableDescriptor.getTableName().getNamespaceAsString(), Action.ADMIN);
1079    } else {
1080      accessChecker.requirePermission(user, "cloneSnapshot " + snapshot.getName(), null,
1081        Action.ADMIN);
1082    }
1083  }
1084
1085  @Override
1086  public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1087    final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor) throws IOException {
1088    User user = getActiveUser(ctx);
1089    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1090      accessChecker.requirePermission(user, "restoreSnapshot " + snapshot.getName(),
1091        hTableDescriptor.getTableName(), null, null, null, Permission.Action.ADMIN);
1092    } else {
1093      accessChecker.requirePermission(user, "restoreSnapshot " + snapshot.getName(), null,
1094        Action.ADMIN);
1095    }
1096  }
1097
1098  @Override
1099  public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1100    final SnapshotDescription snapshot) throws IOException {
1101    User user = getActiveUser(ctx);
1102    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1103      // Snapshot owner is allowed to delete the snapshot
1104      AuthResult result = AuthResult.allow("deleteSnapshot " + snapshot.getName(),
1105        "Snapshot owner check allowed", user, null, null, null);
1106      AccessChecker.logResult(result);
1107    } else {
1108      accessChecker.requirePermission(user, "deleteSnapshot " + snapshot.getName(), null,
1109        Action.ADMIN);
1110    }
1111  }
1112
1113  @Override
1114  public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1115    NamespaceDescriptor ns) throws IOException {
1116    requireGlobalPermission(ctx, "createNamespace", Action.ADMIN, ns.getName());
1117  }
1118
1119  @Override
1120  public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1121    String namespace) throws IOException {
1122    requireGlobalPermission(ctx, "deleteNamespace", Action.ADMIN, namespace);
1123  }
1124
1125  @Override
1126  public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1127    final String namespace) throws IOException {
1128    final Configuration conf = ctx.getEnvironment().getConfiguration();
1129    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1130      @Override
1131      public Void run() throws Exception {
1132        try (Table table =
1133          ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
1134          PermissionStorage.removeNamespacePermissions(conf, namespace, table);
1135        }
1136        return null;
1137      }
1138    });
1139    zkPermissionWatcher.deleteNamespaceACLNode(namespace);
1140    LOG.info(namespace + " entry deleted in " + PermissionStorage.ACL_TABLE_NAME + " table.");
1141  }
1142
1143  @Override
1144  public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1145    NamespaceDescriptor ns) throws IOException {
1146    // We require only global permission so that
1147    // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1148    requireGlobalPermission(ctx, "modifyNamespace", Action.ADMIN, ns.getName());
1149  }
1150
1151  @Override
1152  public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
1153    String namespace) throws IOException {
1154    requireNamespacePermission(ctx, "getNamespaceDescriptor", namespace, Action.ADMIN);
1155  }
1156
1157  @Override
1158  public void postListNamespaces(ObserverContext<MasterCoprocessorEnvironment> ctx,
1159    List<String> namespaces) throws IOException {
1160    /* always allow namespace listing */
1161  }
1162
1163  @Override
1164  public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1165    List<NamespaceDescriptor> descriptors) throws IOException {
1166    // Retains only those which passes authorization checks, as the checks weren't done as part
1167    // of preGetTableDescriptors.
1168    Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1169    User user = getActiveUser(ctx);
1170    while (itr.hasNext()) {
1171      NamespaceDescriptor desc = itr.next();
1172      try {
1173        accessChecker.requireNamespacePermission(user, "listNamespaces", desc.getName(), null,
1174          Action.ADMIN);
1175      } catch (AccessDeniedException e) {
1176        itr.remove();
1177      }
1178    }
1179  }
1180
1181  @Override
1182  public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1183    final TableName tableName) throws IOException {
1184    // Move this ACL check to MasterFlushTableProcedureManager#checkPermissions as part of AC
1185    // deprecation.
1186    requirePermission(ctx, "flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1187  }
1188
1189  @Override
1190  public void preSplitRegion(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1191    final TableName tableName, final byte[] splitRow) throws IOException {
1192    requirePermission(ctx, "split", tableName, null, null, Action.ADMIN);
1193  }
1194
1195  @Override
1196  public void preClearDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
1197    throws IOException {
1198    requirePermission(ctx, "clearDeadServers", Action.ADMIN);
1199  }
1200
1201  @Override
1202  public void preDecommissionRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
1203    List<ServerName> servers, boolean offload) throws IOException {
1204    requirePermission(ctx, "decommissionRegionServers", Action.ADMIN);
1205  }
1206
1207  @Override
1208  public void preListDecommissionedRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
1209    throws IOException {
1210    requirePermission(ctx, "listDecommissionedRegionServers", Action.READ);
1211  }
1212
1213  @Override
1214  public void preRecommissionRegionServer(ObserverContext<MasterCoprocessorEnvironment> ctx,
1215    ServerName server, List<byte[]> encodedRegionNames) throws IOException {
1216    requirePermission(ctx, "recommissionRegionServers", Action.ADMIN);
1217  }
1218
1219  /* ---- RegionObserver implementation ---- */
1220
1221  @Override
1222  public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {
1223    RegionCoprocessorEnvironment env = c.getEnvironment();
1224    final Region region = env.getRegion();
1225    if (region == null) {
1226      LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1227    } else {
1228      RegionInfo regionInfo = region.getRegionInfo();
1229      if (regionInfo.getTable().isSystemTable()) {
1230        checkSystemOrSuperUser(getActiveUser(c));
1231      } else {
1232        requirePermission(c, "preOpen", Action.ADMIN);
1233      }
1234    }
1235  }
1236
1237  @Override
1238  public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1239    RegionCoprocessorEnvironment env = c.getEnvironment();
1240    final Region region = env.getRegion();
1241    if (region == null) {
1242      LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1243      return;
1244    }
1245    if (PermissionStorage.isAclRegion(region)) {
1246      aclRegion = true;
1247      try {
1248        initialize(env);
1249      } catch (IOException ex) {
1250        // if we can't obtain permissions, it's better to fail
1251        // than perform checks incorrectly
1252        throw new RuntimeException("Failed to initialize permissions cache", ex);
1253      }
1254    } else {
1255      initialized = true;
1256    }
1257  }
1258
1259  @Override
1260  public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
1261    FlushLifeCycleTracker tracker) throws IOException {
1262    requirePermission(c, "flush", getTableName(c.getEnvironment()), null, null, Action.ADMIN,
1263      Action.CREATE);
1264  }
1265
1266  @Override
1267  public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
1268    InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
1269    CompactionRequest request) throws IOException {
1270    requirePermission(c, "compact", getTableName(c.getEnvironment()), null, null, Action.ADMIN,
1271      Action.CREATE);
1272    return scanner;
1273  }
1274
1275  private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1276    final Query query, OpType opType) throws IOException {
1277    Filter filter = query.getFilter();
1278    // Don't wrap an AccessControlFilter
1279    if (filter != null && filter instanceof AccessControlFilter) {
1280      return;
1281    }
1282    User user = getActiveUser(c);
1283    RegionCoprocessorEnvironment env = c.getEnvironment();
1284    Map<byte[], ? extends Collection<byte[]>> families = null;
1285    switch (opType) {
1286      case GET:
1287      case EXISTS:
1288        families = ((Get) query).getFamilyMap();
1289        break;
1290      case SCAN:
1291        families = ((Scan) query).getFamilyMap();
1292        break;
1293      default:
1294        throw new RuntimeException("Unhandled operation " + opType);
1295    }
1296    AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1297    Region region = getRegion(env);
1298    TableName table = getTableName(region);
1299    Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1300    for (ColumnFamilyDescriptor hcd : region.getTableDescriptor().getColumnFamilies()) {
1301      cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1302    }
1303    if (!authResult.isAllowed()) {
1304      if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1305        // Old behavior: Scan with only qualifier checks if we have partial
1306        // permission. Backwards compatible behavior is to throw an
1307        // AccessDeniedException immediately if there are no grants for table
1308        // or CF or CF+qual. Only proceed with an injected filter if there are
1309        // grants for qualifiers. Otherwise we will fall through below and log
1310        // the result and throw an ADE. We may end up checking qualifier
1311        // grants three times (permissionGranted above, here, and in the
1312        // filter) but that's the price of backwards compatibility.
1313        if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1314          authResult.setAllowed(true);
1315          authResult.setReason("Access allowed with filter");
1316          // Only wrap the filter if we are enforcing authorizations
1317          if (authorizationEnabled) {
1318            Filter ourFilter = new AccessControlFilter(getAuthManager(), user, table,
1319              AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY, cfVsMaxVersions);
1320            // wrap any existing filter
1321            if (filter != null) {
1322              ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1323                Lists.newArrayList(ourFilter, filter));
1324            }
1325            switch (opType) {
1326              case GET:
1327              case EXISTS:
1328                ((Get) query).setFilter(ourFilter);
1329                break;
1330              case SCAN:
1331                ((Scan) query).setFilter(ourFilter);
1332                break;
1333              default:
1334                throw new RuntimeException("Unhandled operation " + opType);
1335            }
1336          }
1337        }
1338      } else {
1339        // New behavior: Any access we might be granted is more fine-grained
1340        // than whole table or CF. Simply inject a filter and return what is
1341        // allowed. We will not throw an AccessDeniedException. This is a
1342        // behavioral change since 0.96.
1343        authResult.setAllowed(true);
1344        authResult.setReason("Access allowed with filter");
1345        // Only wrap the filter if we are enforcing authorizations
1346        if (authorizationEnabled) {
1347          Filter ourFilter = new AccessControlFilter(getAuthManager(), user, table,
1348            AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1349          // wrap any existing filter
1350          if (filter != null) {
1351            ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1352              Lists.newArrayList(ourFilter, filter));
1353          }
1354          switch (opType) {
1355            case GET:
1356            case EXISTS:
1357              ((Get) query).setFilter(ourFilter);
1358              break;
1359            case SCAN:
1360              ((Scan) query).setFilter(ourFilter);
1361              break;
1362            default:
1363              throw new RuntimeException("Unhandled operation " + opType);
1364          }
1365        }
1366      }
1367    }
1368
1369    AccessChecker.logResult(authResult);
1370    if (authorizationEnabled && !authResult.isAllowed()) {
1371      throw new AccessDeniedException("Insufficient permissions for user '"
1372        + (user != null ? user.getShortName() : "null") + "' (table=" + table + ", action=READ)");
1373    }
1374  }
1375
1376  @Override
1377  public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c, final Get get,
1378    final List<Cell> result) throws IOException {
1379    internalPreRead(c, get, OpType.GET);
1380  }
1381
1382  @Override
1383  public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c, final Get get,
1384    final boolean exists) throws IOException {
1385    internalPreRead(c, get, OpType.EXISTS);
1386    return exists;
1387  }
1388
1389  @Override
1390  public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
1391    final WALEdit edit, final Durability durability) throws IOException {
1392    User user = getActiveUser(c);
1393    checkForReservedTagPresence(user, put);
1394
1395    // Require WRITE permission to the table, CF, or top visible value, if any.
1396    // NOTE: We don't need to check the permissions for any earlier Puts
1397    // because we treat the ACLs in each Put as timestamped like any other
1398    // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1399    // change the ACL of any previous Put. This allows simple evolution of
1400    // security policy over time without requiring expensive updates.
1401    RegionCoprocessorEnvironment env = c.getEnvironment();
1402    Map<byte[], ? extends Collection<Cell>> families = put.getFamilyCellMap();
1403    AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1404    AccessChecker.logResult(authResult);
1405    if (!authResult.isAllowed()) {
1406      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1407        put.setAttribute(CHECK_COVERING_PERM, TRUE);
1408      } else if (authorizationEnabled) {
1409        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1410      }
1411    }
1412
1413    // Add cell ACLs from the operation to the cells themselves
1414    byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1415    if (bytes != null) {
1416      if (cellFeaturesEnabled) {
1417        addCellPermissions(bytes, put.getFamilyCellMap());
1418      } else {
1419        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1420      }
1421    }
1422  }
1423
1424  @Override
1425  public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
1426    final WALEdit edit, final Durability durability) {
1427    if (aclRegion) {
1428      updateACL(c.getEnvironment(), put.getFamilyCellMap());
1429    }
1430  }
1431
1432  @Override
1433  public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c, final Delete delete,
1434    final WALEdit edit, final Durability durability) throws IOException {
1435    // An ACL on a delete is useless, we shouldn't allow it
1436    if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1437      throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1438    }
1439    // Require WRITE permissions on all cells covered by the delete. Unlike
1440    // for Puts we need to check all visible prior versions, because a major
1441    // compaction could remove them. If the user doesn't have permission to
1442    // overwrite any of the visible versions ('visible' defined as not covered
1443    // by a tombstone already) then we have to disallow this operation.
1444    RegionCoprocessorEnvironment env = c.getEnvironment();
1445    Map<byte[], ? extends Collection<Cell>> families = delete.getFamilyCellMap();
1446    User user = getActiveUser(c);
1447    AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1448    AccessChecker.logResult(authResult);
1449    if (!authResult.isAllowed()) {
1450      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1451        delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1452      } else if (authorizationEnabled) {
1453        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1454      }
1455    }
1456  }
1457
1458  @Override
1459  public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1460    MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1461    if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1462      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1463      User user = getActiveUser(c);
1464      for (int i = 0; i < miniBatchOp.size(); i++) {
1465        Mutation m = miniBatchOp.getOperation(i);
1466        if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1467          // We have a failure with table, cf and q perm checks and now giving a chance for cell
1468          // perm check
1469          OpType opType;
1470          long timestamp;
1471          if (m instanceof Put) {
1472            checkForReservedTagPresence(user, m);
1473            opType = OpType.PUT;
1474            timestamp = m.getTimestamp();
1475          } else if (m instanceof Delete) {
1476            opType = OpType.DELETE;
1477            timestamp = m.getTimestamp();
1478          } else if (m instanceof Increment) {
1479            opType = OpType.INCREMENT;
1480            timestamp = ((Increment) m).getTimeRange().getMax();
1481          } else if (m instanceof Append) {
1482            opType = OpType.APPEND;
1483            timestamp = ((Append) m).getTimeRange().getMax();
1484          } else {
1485            // If the operation type is not Put/Delete/Increment/Append, do nothing
1486            continue;
1487          }
1488          AuthResult authResult = null;
1489          if (
1490            checkCoveringPermission(user, opType, c.getEnvironment(), m.getRow(),
1491              m.getFamilyCellMap(), timestamp, Action.WRITE)
1492          ) {
1493            authResult = AuthResult.allow(opType.toString(), "Covering cell set", user,
1494              Action.WRITE, table, m.getFamilyCellMap());
1495          } else {
1496            authResult = AuthResult.deny(opType.toString(), "Covering cell set", user, Action.WRITE,
1497              table, m.getFamilyCellMap());
1498          }
1499          AccessChecker.logResult(authResult);
1500          if (authorizationEnabled && !authResult.isAllowed()) {
1501            throw new AccessDeniedException(
1502              "Insufficient permissions " + authResult.toContextString());
1503          }
1504        }
1505      }
1506    }
1507  }
1508
1509  @Override
1510  public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c, final Delete delete,
1511    final WALEdit edit, final Durability durability) throws IOException {
1512    if (aclRegion) {
1513      updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1514    }
1515  }
1516
1517  @Override
1518  public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1519    final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op,
1520    final ByteArrayComparable comparator, final Put put, final boolean result) throws IOException {
1521    User user = getActiveUser(c);
1522    checkForReservedTagPresence(user, put);
1523
1524    // Require READ and WRITE permissions on the table, CF, and KV to update
1525    RegionCoprocessorEnvironment env = c.getEnvironment();
1526    Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1527    AuthResult authResult =
1528      permissionGranted(OpType.CHECK_AND_PUT, user, env, families, Action.READ, Action.WRITE);
1529    AccessChecker.logResult(authResult);
1530    if (!authResult.isAllowed()) {
1531      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1532        put.setAttribute(CHECK_COVERING_PERM, TRUE);
1533      } else if (authorizationEnabled) {
1534        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1535      }
1536    }
1537
1538    byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1539    if (bytes != null) {
1540      if (cellFeaturesEnabled) {
1541        addCellPermissions(bytes, put.getFamilyCellMap());
1542      } else {
1543        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1544      }
1545    }
1546    return result;
1547  }
1548
1549  @Override
1550  public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1551    final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator opp,
1552    final ByteArrayComparable comparator, final Put put, final boolean result) throws IOException {
1553    if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1554      // We had failure with table, cf and q perm checks and now giving a chance for cell
1555      // perm check
1556      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1557      Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1558      AuthResult authResult = null;
1559      User user = getActiveUser(c);
1560      if (
1561        checkCoveringPermission(user, OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1562          HConstants.LATEST_TIMESTAMP, Action.READ)
1563      ) {
1564        authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set", user,
1565          Action.READ, table, families);
1566      } else {
1567        authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set", user,
1568          Action.READ, table, families);
1569      }
1570      AccessChecker.logResult(authResult);
1571      if (authorizationEnabled && !authResult.isAllowed()) {
1572        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1573      }
1574    }
1575    return result;
1576  }
1577
1578  @Override
1579  public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1580    final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op,
1581    final ByteArrayComparable comparator, final Delete delete, final boolean result)
1582    throws IOException {
1583    // An ACL on a delete is useless, we shouldn't allow it
1584    if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1585      throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " + delete.toString());
1586    }
1587    // Require READ and WRITE permissions on the table, CF, and the KV covered
1588    // by the delete
1589    RegionCoprocessorEnvironment env = c.getEnvironment();
1590    Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1591    User user = getActiveUser(c);
1592    AuthResult authResult =
1593      permissionGranted(OpType.CHECK_AND_DELETE, user, env, families, Action.READ, Action.WRITE);
1594    AccessChecker.logResult(authResult);
1595    if (!authResult.isAllowed()) {
1596      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1597        delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1598      } else if (authorizationEnabled) {
1599        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1600      }
1601    }
1602    return result;
1603  }
1604
1605  @Override
1606  public boolean preCheckAndDeleteAfterRowLock(
1607    final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1608    final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator,
1609    final Delete delete, final boolean result) throws IOException {
1610    if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1611      // We had failure with table, cf and q perm checks and now giving a chance for cell
1612      // perm check
1613      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1614      Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1615      AuthResult authResult = null;
1616      User user = getActiveUser(c);
1617      if (
1618        checkCoveringPermission(user, OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1619          HConstants.LATEST_TIMESTAMP, Action.READ)
1620      ) {
1621        authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set", user,
1622          Action.READ, table, families);
1623      } else {
1624        authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set", user,
1625          Action.READ, table, families);
1626      }
1627      AccessChecker.logResult(authResult);
1628      if (authorizationEnabled && !authResult.isAllowed()) {
1629        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1630      }
1631    }
1632    return result;
1633  }
1634
1635  @Override
1636  public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1637    throws IOException {
1638    User user = getActiveUser(c);
1639    checkForReservedTagPresence(user, append);
1640
1641    // Require WRITE permission to the table, CF, and the KV to be appended
1642    RegionCoprocessorEnvironment env = c.getEnvironment();
1643    Map<byte[], ? extends Collection<Cell>> families = append.getFamilyCellMap();
1644    AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1645    AccessChecker.logResult(authResult);
1646    if (!authResult.isAllowed()) {
1647      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1648        append.setAttribute(CHECK_COVERING_PERM, TRUE);
1649      } else if (authorizationEnabled) {
1650        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1651      }
1652    }
1653
1654    byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1655    if (bytes != null) {
1656      if (cellFeaturesEnabled) {
1657        addCellPermissions(bytes, append.getFamilyCellMap());
1658      } else {
1659        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1660      }
1661    }
1662
1663    return null;
1664  }
1665
1666  @Override
1667  public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1668    final Increment increment) throws IOException {
1669    User user = getActiveUser(c);
1670    checkForReservedTagPresence(user, increment);
1671
1672    // Require WRITE permission to the table, CF, and the KV to be replaced by
1673    // the incremented value
1674    RegionCoprocessorEnvironment env = c.getEnvironment();
1675    Map<byte[], ? extends Collection<Cell>> families = increment.getFamilyCellMap();
1676    AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families, Action.WRITE);
1677    AccessChecker.logResult(authResult);
1678    if (!authResult.isAllowed()) {
1679      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1680        increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1681      } else if (authorizationEnabled) {
1682        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1683      }
1684    }
1685
1686    byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1687    if (bytes != null) {
1688      if (cellFeaturesEnabled) {
1689        addCellPermissions(bytes, increment.getFamilyCellMap());
1690      } else {
1691        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1692      }
1693    }
1694
1695    return null;
1696  }
1697
1698  @Override
1699  public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
1700    ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
1701    List<Pair<Cell, Cell>> cellPairs) throws IOException {
1702    // If the HFile version is insufficient to persist tags, we won't have any
1703    // work to do here
1704    if (!cellFeaturesEnabled || mutation.getACL() == null) {
1705      return cellPairs;
1706    }
1707    return cellPairs.stream()
1708      .map(pair -> new Pair<>(pair.getFirst(),
1709        createNewCellWithTags(mutation, pair.getFirst(), pair.getSecond())))
1710      .collect(Collectors.toList());
1711  }
1712
1713  @Override
1714  public List<Pair<Cell, Cell>> postAppendBeforeWAL(
1715    ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
1716    List<Pair<Cell, Cell>> cellPairs) throws IOException {
1717    // If the HFile version is insufficient to persist tags, we won't have any
1718    // work to do here
1719    if (!cellFeaturesEnabled || mutation.getACL() == null) {
1720      return cellPairs;
1721    }
1722    return cellPairs.stream()
1723      .map(pair -> new Pair<>(pair.getFirst(),
1724        createNewCellWithTags(mutation, pair.getFirst(), pair.getSecond())))
1725      .collect(Collectors.toList());
1726  }
1727
1728  private Cell createNewCellWithTags(Mutation mutation, Cell oldCell, Cell newCell) {
1729    // As Increment and Append operations have already copied the tags of oldCell to the newCell,
1730    // there is no need to rewrite them again. Just extract non-acl tags of newCell if we need to
1731    // add a new acl tag for the cell. Actually, oldCell is useless here.
1732    List<Tag> tags = Lists.newArrayList();
1733    if (newCell != null) {
1734      Iterator<Tag> tagIterator = PrivateCellUtil.tagsIterator(newCell);
1735      while (tagIterator.hasNext()) {
1736        Tag tag = tagIterator.next();
1737        if (tag.getType() != PermissionStorage.ACL_TAG_TYPE) {
1738          // Not an ACL tag, just carry it through
1739          if (LOG.isTraceEnabled()) {
1740            LOG.trace("Carrying forward tag from " + newCell + ": type " + tag.getType()
1741              + " length " + tag.getValueLength());
1742          }
1743          tags.add(tag);
1744        }
1745      }
1746    }
1747
1748    // We have checked the ACL tag of mutation is not null.
1749    // So that the tags could not be empty.
1750    tags.add(new ArrayBackedTag(PermissionStorage.ACL_TAG_TYPE, mutation.getACL()));
1751    return PrivateCellUtil.createCell(newCell, tags);
1752  }
1753
1754  @Override
1755  public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan)
1756    throws IOException {
1757    internalPreRead(c, scan, OpType.SCAN);
1758  }
1759
1760  @Override
1761  public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1762    final Scan scan, final RegionScanner s) throws IOException {
1763    User user = getActiveUser(c);
1764    if (user != null && user.getShortName() != null) {
1765      // store reference to scanner owner for later checks
1766      scannerOwners.put(s, user.getShortName());
1767    }
1768    return s;
1769  }
1770
1771  @Override
1772  public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1773    final InternalScanner s, final List<Result> result, final int limit, final boolean hasNext)
1774    throws IOException {
1775    requireScannerOwner(s);
1776    return hasNext;
1777  }
1778
1779  @Override
1780  public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1781    final InternalScanner s) throws IOException {
1782    requireScannerOwner(s);
1783  }
1784
1785  @Override
1786  public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1787    final InternalScanner s) throws IOException {
1788    // clean up any associated owner mapping
1789    scannerOwners.remove(s);
1790  }
1791
1792  /**
1793   * Verify, when servicing an RPC, that the caller is the scanner owner. If so, we assume that
1794   * access control is correctly enforced based on the checks performed in preScannerOpen()
1795   */
1796  private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
1797    if (!RpcServer.isInRpcCallContext()) {
1798      return;
1799    }
1800    String requestUserName = RpcServer.getRequestUserName().orElse(null);
1801    String owner = scannerOwners.get(s);
1802    if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
1803      throw new AccessDeniedException("User '" + requestUserName + "' is not the scanner owner!");
1804    }
1805  }
1806
1807  /**
1808   * Verifies user has CREATE or ADMIN privileges on the Column Families involved in the
1809   * bulkLoadHFile request. Specific Column Write privileges are presently ignored.
1810   */
1811  @Override
1812  public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
1813    List<Pair<byte[], String>> familyPaths) throws IOException {
1814    User user = getActiveUser(ctx);
1815    for (Pair<byte[], String> el : familyPaths) {
1816      accessChecker.requirePermission(user, "preBulkLoadHFile",
1817        ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), el.getFirst(), null,
1818        null, Action.ADMIN, Action.CREATE);
1819    }
1820  }
1821
1822  /**
1823   * Authorization check for SecureBulkLoadProtocol.prepareBulkLoad()
1824   * @param ctx the context
1825   */
1826  @Override
1827  public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
1828    throws IOException {
1829    requireAccess(ctx, "prePrepareBulkLoad",
1830      ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.ADMIN,
1831      Action.CREATE);
1832  }
1833
1834  /**
1835   * Authorization security check for SecureBulkLoadProtocol.cleanupBulkLoad()
1836   * @param ctx the context
1837   */
1838  @Override
1839  public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
1840    throws IOException {
1841    requireAccess(ctx, "preCleanupBulkLoad",
1842      ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.ADMIN,
1843      Action.CREATE);
1844  }
1845
1846  /* ---- EndpointObserver implementation ---- */
1847
1848  @Override
1849  public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
1850    Service service, String methodName, Message request) throws IOException {
1851    // Don't intercept calls to our own AccessControlService, we check for
1852    // appropriate permissions in the service handlers
1853    if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
1854      requirePermission(ctx,
1855        "invoke(" + service.getDescriptorForType().getName() + "." + methodName + ")",
1856        getTableName(ctx.getEnvironment()), null, null, Action.EXEC);
1857    }
1858    return request;
1859  }
1860
1861  @Override
1862  public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
1863    Service service, String methodName, Message request, Message.Builder responseBuilder)
1864    throws IOException {
1865  }
1866
1867  /* ---- Protobuf AccessControlService implementation ---- */
1868
1869  /**
1870   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1871   *             {@link Admin#grant(UserPermission, boolean)} instead.
1872   * @see Admin#grant(UserPermission, boolean)
1873   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21739">HBASE-21739</a>
1874   */
1875  @Deprecated
1876  @Override
1877  public void grant(RpcController controller, AccessControlProtos.GrantRequest request,
1878    RpcCallback<AccessControlProtos.GrantResponse> done) {
1879    final UserPermission perm = AccessControlUtil.toUserPermission(request.getUserPermission());
1880    AccessControlProtos.GrantResponse response = null;
1881    try {
1882      // verify it's only running at .acl.
1883      if (aclRegion) {
1884        if (!initialized) {
1885          throw new CoprocessorException("AccessController not yet initialized");
1886        }
1887        User caller = RpcServer.getRequestUser().orElse(null);
1888        if (LOG.isDebugEnabled()) {
1889          LOG.debug("Received request from {} to grant access permission {}", caller.getName(),
1890            perm.toString());
1891        }
1892        preGrantOrRevoke(caller, "grant", perm);
1893
1894        // regionEnv is set at #start. Hopefully not null at this point.
1895        regionEnv.getConnection().getAdmin().grant(
1896          new UserPermission(perm.getUser(), perm.getPermission()),
1897          request.getMergeExistingPermissions());
1898        if (AUDITLOG.isTraceEnabled()) {
1899          // audit log should store permission changes in addition to auth results
1900          AUDITLOG.trace("Granted permission " + perm.toString());
1901        }
1902      } else {
1903        throw new CoprocessorException(AccessController.class,
1904          "This method " + "can only execute at " + PermissionStorage.ACL_TABLE_NAME + " table.");
1905      }
1906      response = AccessControlProtos.GrantResponse.getDefaultInstance();
1907    } catch (IOException ioe) {
1908      // pass exception back up
1909      CoprocessorRpcUtils.setControllerException(controller, ioe);
1910    }
1911    done.run(response);
1912  }
1913
1914  /**
1915   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use {@link Admin#revoke(UserPermission)}
1916   *             instead.
1917   * @see Admin#revoke(UserPermission)
1918   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21739">HBASE-21739</a>
1919   */
1920  @Deprecated
1921  @Override
1922  public void revoke(RpcController controller, AccessControlProtos.RevokeRequest request,
1923    RpcCallback<AccessControlProtos.RevokeResponse> done) {
1924    final UserPermission perm = AccessControlUtil.toUserPermission(request.getUserPermission());
1925    AccessControlProtos.RevokeResponse response = null;
1926    try {
1927      // only allowed to be called on _acl_ region
1928      if (aclRegion) {
1929        if (!initialized) {
1930          throw new CoprocessorException("AccessController not yet initialized");
1931        }
1932        User caller = RpcServer.getRequestUser().orElse(null);
1933        if (LOG.isDebugEnabled()) {
1934          LOG.debug("Received request from {} to revoke access permission {}",
1935            caller.getShortName(), perm.toString());
1936        }
1937        preGrantOrRevoke(caller, "revoke", perm);
1938        // regionEnv is set at #start. Hopefully not null here.
1939        regionEnv.getConnection().getAdmin()
1940          .revoke(new UserPermission(perm.getUser(), perm.getPermission()));
1941        if (AUDITLOG.isTraceEnabled()) {
1942          // audit log should record all permission changes
1943          AUDITLOG.trace("Revoked permission " + perm.toString());
1944        }
1945      } else {
1946        throw new CoprocessorException(AccessController.class,
1947          "This method " + "can only execute at " + PermissionStorage.ACL_TABLE_NAME + " table.");
1948      }
1949      response = AccessControlProtos.RevokeResponse.getDefaultInstance();
1950    } catch (IOException ioe) {
1951      // pass exception back up
1952      CoprocessorRpcUtils.setControllerException(controller, ioe);
1953    }
1954    done.run(response);
1955  }
1956
1957  /**
1958   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1959   *             {@link Admin#getUserPermissions(GetUserPermissionsRequest)} instead.
1960   * @see Admin#getUserPermissions(GetUserPermissionsRequest)
1961   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21911">HBASE-21911</a>
1962   */
1963  @Deprecated
1964  @Override
1965  public void getUserPermissions(RpcController controller,
1966    AccessControlProtos.GetUserPermissionsRequest request,
1967    RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
1968    AccessControlProtos.GetUserPermissionsResponse response = null;
1969    try {
1970      // only allowed to be called on _acl_ region
1971      if (aclRegion) {
1972        if (!initialized) {
1973          throw new CoprocessorException("AccessController not yet initialized");
1974        }
1975        User caller = RpcServer.getRequestUser().orElse(null);
1976        final String userName = request.hasUserName() ? request.getUserName().toStringUtf8() : null;
1977        final String namespace =
1978          request.hasNamespaceName() ? request.getNamespaceName().toStringUtf8() : null;
1979        final TableName table =
1980          request.hasTableName() ? ProtobufUtil.toTableName(request.getTableName()) : null;
1981        final byte[] cf =
1982          request.hasColumnFamily() ? request.getColumnFamily().toByteArray() : null;
1983        final byte[] cq =
1984          request.hasColumnQualifier() ? request.getColumnQualifier().toByteArray() : null;
1985        preGetUserPermissions(caller, userName, namespace, table, cf, cq);
1986        GetUserPermissionsRequest getUserPermissionsRequest = null;
1987        if (request.getType() == AccessControlProtos.Permission.Type.Table) {
1988          getUserPermissionsRequest = GetUserPermissionsRequest.newBuilder(table).withFamily(cf)
1989            .withQualifier(cq).withUserName(userName).build();
1990        } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
1991          getUserPermissionsRequest =
1992            GetUserPermissionsRequest.newBuilder(namespace).withUserName(userName).build();
1993        } else {
1994          getUserPermissionsRequest =
1995            GetUserPermissionsRequest.newBuilder().withUserName(userName).build();
1996        }
1997        List<UserPermission> perms =
1998          regionEnv.getConnection().getAdmin().getUserPermissions(getUserPermissionsRequest);
1999        response = AccessControlUtil.buildGetUserPermissionsResponse(perms);
2000      } else {
2001        throw new CoprocessorException(AccessController.class,
2002          "This method " + "can only execute at " + PermissionStorage.ACL_TABLE_NAME + " table.");
2003      }
2004    } catch (IOException ioe) {
2005      // pass exception back up
2006      CoprocessorRpcUtils.setControllerException(controller, ioe);
2007    }
2008    done.run(response);
2009  }
2010
2011  /**
2012   * @deprecated since 2.2.0 and will be removed 4.0.0. Use {@link Admin#hasUserPermissions(List)}
2013   *             instead.
2014   * @see Admin#hasUserPermissions(List)
2015   * @see <a href="https://issues.apache.org/jira/browse/HBASE-22117">HBASE-22117</a>
2016   */
2017  @Deprecated
2018  @Override
2019  public void checkPermissions(RpcController controller,
2020    AccessControlProtos.CheckPermissionsRequest request,
2021    RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2022    AccessControlProtos.CheckPermissionsResponse response = null;
2023    try {
2024      User user = RpcServer.getRequestUser().orElse(null);
2025      TableName tableName = regionEnv.getRegion().getTableDescriptor().getTableName();
2026      List<Permission> permissions = new ArrayList<>();
2027      for (int i = 0; i < request.getPermissionCount(); i++) {
2028        Permission permission = AccessControlUtil.toPermission(request.getPermission(i));
2029        permissions.add(permission);
2030        if (permission instanceof TablePermission) {
2031          TablePermission tperm = (TablePermission) permission;
2032          if (!tperm.getTableName().equals(tableName)) {
2033            throw new CoprocessorException(AccessController.class,
2034              String.format(
2035                "This method can only execute at the table specified in "
2036                  + "TablePermission. Table of the region:%s , requested table:%s",
2037                tableName, tperm.getTableName()));
2038          }
2039        }
2040      }
2041      for (Permission permission : permissions) {
2042        boolean hasPermission =
2043          accessChecker.hasUserPermission(user, "checkPermissions", permission);
2044        if (!hasPermission) {
2045          throw new AccessDeniedException("Insufficient permissions " + permission.toString());
2046        }
2047      }
2048      response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2049    } catch (IOException ioe) {
2050      CoprocessorRpcUtils.setControllerException(controller, ioe);
2051    }
2052    done.run(response);
2053  }
2054
2055  private Region getRegion(RegionCoprocessorEnvironment e) {
2056    return e.getRegion();
2057  }
2058
2059  private TableName getTableName(RegionCoprocessorEnvironment e) {
2060    Region region = e.getRegion();
2061    if (region != null) {
2062      return getTableName(region);
2063    }
2064    return null;
2065  }
2066
2067  private TableName getTableName(Region region) {
2068    RegionInfo regionInfo = region.getRegionInfo();
2069    if (regionInfo != null) {
2070      return regionInfo.getTable();
2071    }
2072    return null;
2073  }
2074
2075  @Override
2076  public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
2077    throws IOException {
2078    requirePermission(c, "preClose", Action.ADMIN);
2079  }
2080
2081  private void checkSystemOrSuperUser(User activeUser) throws IOException {
2082    // No need to check if we're not going to throw
2083    if (!authorizationEnabled) {
2084      return;
2085    }
2086    if (!Superusers.isSuperUser(activeUser)) {
2087      throw new AccessDeniedException(
2088        "User '" + (activeUser != null ? activeUser.getShortName() : "null")
2089          + "' is not system or super user.");
2090    }
2091  }
2092
2093  @Override
2094  public void preStopRegionServer(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2095    throws IOException {
2096    requirePermission(ctx, "preStopRegionServer", Action.ADMIN);
2097  }
2098
2099  private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family, byte[] qualifier) {
2100    if (family == null) {
2101      return null;
2102    }
2103
2104    Map<byte[], Collection<byte[]>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2105    familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2106    return familyMap;
2107  }
2108
2109  @Override
2110  public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2111    List<TableName> tableNamesList, List<TableDescriptor> descriptors, String regex)
2112    throws IOException {
2113    // We are delegating the authorization check to postGetTableDescriptors as we don't have
2114    // any concrete set of table names when a regex is present or the full list is requested.
2115    if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2116      // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2117      // request can be granted.
2118      TableName[] sns = null;
2119      try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) {
2120        sns = admin.listTableNames();
2121        if (sns == null) return;
2122        for (TableName tableName : tableNamesList) {
2123          // Skip checks for a table that does not exist
2124          if (!admin.tableExists(tableName)) continue;
2125          requirePermission(ctx, "getTableDescriptors", tableName, null, null, Action.ADMIN,
2126            Action.CREATE);
2127        }
2128      }
2129    }
2130  }
2131
2132  @Override
2133  public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2134    List<TableName> tableNamesList, List<TableDescriptor> descriptors, String regex)
2135    throws IOException {
2136    // Skipping as checks in this case are already done by preGetTableDescriptors.
2137    if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2138      return;
2139    }
2140
2141    // Retains only those which passes authorization checks, as the checks weren't done as part
2142    // of preGetTableDescriptors.
2143    Iterator<TableDescriptor> itr = descriptors.iterator();
2144    while (itr.hasNext()) {
2145      TableDescriptor htd = itr.next();
2146      try {
2147        requirePermission(ctx, "getTableDescriptors", htd.getTableName(), null, null, Action.ADMIN,
2148          Action.CREATE);
2149      } catch (AccessDeniedException e) {
2150        itr.remove();
2151      }
2152    }
2153  }
2154
2155  @Override
2156  public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2157    List<TableDescriptor> descriptors, String regex) throws IOException {
2158    // Retains only those which passes authorization checks.
2159    Iterator<TableDescriptor> itr = descriptors.iterator();
2160    while (itr.hasNext()) {
2161      TableDescriptor htd = itr.next();
2162      try {
2163        requireAccess(ctx, "getTableNames", htd.getTableName(), Action.values());
2164      } catch (AccessDeniedException e) {
2165        itr.remove();
2166      }
2167    }
2168  }
2169
2170  @Override
2171  public void preMergeRegions(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2172    final RegionInfo[] regionsToMerge) throws IOException {
2173    requirePermission(ctx, "mergeRegions", regionsToMerge[0].getTable(), null, null, Action.ADMIN);
2174  }
2175
2176  @Override
2177  public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2178    throws IOException {
2179    requirePermission(ctx, "preRollLogWriterRequest", Permission.Action.ADMIN);
2180  }
2181
2182  @Override
2183  public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2184    throws IOException {
2185  }
2186
2187  @Override
2188  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2189    final String userName, final GlobalQuotaSettings quotas) throws IOException {
2190    requirePermission(ctx, "setUserQuota", Action.ADMIN);
2191  }
2192
2193  @Override
2194  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2195    final String userName, final TableName tableName, final GlobalQuotaSettings quotas)
2196    throws IOException {
2197    requirePermission(ctx, "setUserTableQuota", tableName, null, null, Action.ADMIN);
2198  }
2199
2200  @Override
2201  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2202    final String userName, final String namespace, final GlobalQuotaSettings quotas)
2203    throws IOException {
2204    requirePermission(ctx, "setUserNamespaceQuota", Action.ADMIN);
2205  }
2206
2207  @Override
2208  public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2209    final TableName tableName, final GlobalQuotaSettings quotas) throws IOException {
2210    requirePermission(ctx, "setTableQuota", tableName, null, null, Action.ADMIN);
2211  }
2212
2213  @Override
2214  public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2215    final String namespace, final GlobalQuotaSettings quotas) throws IOException {
2216    requirePermission(ctx, "setNamespaceQuota", Action.ADMIN);
2217  }
2218
2219  @Override
2220  public void preSetRegionServerQuota(ObserverContext<MasterCoprocessorEnvironment> ctx,
2221    final String regionServer, GlobalQuotaSettings quotas) throws IOException {
2222    requirePermission(ctx, "setRegionServerQuota", Action.ADMIN);
2223  }
2224
2225  @Override
2226  public ReplicationEndpoint postCreateReplicationEndPoint(
2227    ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2228    return endpoint;
2229  }
2230
2231  @Override
2232  public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2233    throws IOException {
2234    requirePermission(ctx, "replicateLogEntries", Action.WRITE);
2235  }
2236
2237  @Override
2238  public void preClearCompactionQueues(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2239    throws IOException {
2240    requirePermission(ctx, "preClearCompactionQueues", Permission.Action.ADMIN);
2241  }
2242
2243  @Override
2244  public void preAddReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2245    String peerId, ReplicationPeerConfig peerConfig) throws IOException {
2246    requirePermission(ctx, "addReplicationPeer", Action.ADMIN);
2247  }
2248
2249  @Override
2250  public void preRemoveReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2251    String peerId) throws IOException {
2252    requirePermission(ctx, "removeReplicationPeer", Action.ADMIN);
2253  }
2254
2255  @Override
2256  public void preEnableReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2257    String peerId) throws IOException {
2258    requirePermission(ctx, "enableReplicationPeer", Action.ADMIN);
2259  }
2260
2261  @Override
2262  public void preDisableReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2263    String peerId) throws IOException {
2264    requirePermission(ctx, "disableReplicationPeer", Action.ADMIN);
2265  }
2266
2267  @Override
2268  public void preGetReplicationPeerConfig(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2269    String peerId) throws IOException {
2270    requirePermission(ctx, "getReplicationPeerConfig", Action.ADMIN);
2271  }
2272
2273  @Override
2274  public void preUpdateReplicationPeerConfig(
2275    final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
2276    ReplicationPeerConfig peerConfig) throws IOException {
2277    requirePermission(ctx, "updateReplicationPeerConfig", Action.ADMIN);
2278  }
2279
2280  @Override
2281  public void preListReplicationPeers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2282    String regex) throws IOException {
2283    requirePermission(ctx, "listReplicationPeers", Action.ADMIN);
2284  }
2285
2286  @Override
2287  public void preRequestLock(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace,
2288    TableName tableName, RegionInfo[] regionInfos, String description) throws IOException {
2289    // There are operations in the CREATE and ADMIN domain which may require lock, READ
2290    // or WRITE. So for any lock request, we check for these two perms irrespective of lock type.
2291    String reason = String.format("Description=%s", description);
2292    checkLockPermissions(ctx, namespace, tableName, regionInfos, reason);
2293  }
2294
2295  @Override
2296  public void preLockHeartbeat(ObserverContext<MasterCoprocessorEnvironment> ctx,
2297    TableName tableName, String description) throws IOException {
2298    checkLockPermissions(ctx, null, tableName, null, description);
2299  }
2300
2301  @Override
2302  public void preExecuteProcedures(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2303    throws IOException {
2304    checkSystemOrSuperUser(getActiveUser(ctx));
2305  }
2306
2307  @Override
2308  public void preSwitchRpcThrottle(ObserverContext<MasterCoprocessorEnvironment> ctx,
2309    boolean enable) throws IOException {
2310    requirePermission(ctx, "switchRpcThrottle", Action.ADMIN);
2311  }
2312
2313  @Override
2314  public void preIsRpcThrottleEnabled(ObserverContext<MasterCoprocessorEnvironment> ctx)
2315    throws IOException {
2316    requirePermission(ctx, "isRpcThrottleEnabled", Action.ADMIN);
2317  }
2318
2319  @Override
2320  public void preSwitchExceedThrottleQuota(ObserverContext<MasterCoprocessorEnvironment> ctx,
2321    boolean enable) throws IOException {
2322    requirePermission(ctx, "switchExceedThrottleQuota", Action.ADMIN);
2323  }
2324
2325  /**
2326   * Returns the active user to which authorization checks should be applied. If we are in the
2327   * context of an RPC call, the remote user is used, otherwise the currently logged in user is
2328   * used.
2329   */
2330  private User getActiveUser(ObserverContext<?> ctx) throws IOException {
2331    // for non-rpc handling, fallback to system user
2332    Optional<User> optionalUser = ctx.getCaller();
2333    if (optionalUser.isPresent()) {
2334      return optionalUser.get();
2335    }
2336    return userProvider.getCurrent();
2337  }
2338
2339  /**
2340   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
2341   *             {@link Admin#hasUserPermissions(String, List)} instead.
2342   * @see Admin#hasUserPermissions(String, List)
2343   * @see <a href="https://issues.apache.org/jira/browse/HBASE-22117">HBASE-22117</a>
2344   */
2345  @Deprecated
2346  @Override
2347  public void hasPermission(RpcController controller, HasPermissionRequest request,
2348    RpcCallback<HasPermissionResponse> done) {
2349    // Converts proto to a TablePermission object.
2350    TablePermission tPerm = AccessControlUtil.toTablePermission(request.getTablePermission());
2351    // Check input user name
2352    if (!request.hasUserName()) {
2353      throw new IllegalStateException("Input username cannot be empty");
2354    }
2355    final String inputUserName = request.getUserName().toStringUtf8();
2356    AccessControlProtos.HasPermissionResponse response = null;
2357    try {
2358      User caller = RpcServer.getRequestUser().orElse(null);
2359      List<Permission> permissions = Lists.newArrayList(tPerm);
2360      preHasUserPermissions(caller, inputUserName, permissions);
2361      boolean hasPermission =
2362        regionEnv.getConnection().getAdmin().hasUserPermissions(inputUserName, permissions).get(0);
2363      response = ResponseConverter.buildHasPermissionResponse(hasPermission);
2364    } catch (IOException ioe) {
2365      ResponseConverter.setControllerException(controller, ioe);
2366    }
2367    done.run(response);
2368  }
2369
2370  @Override
2371  public void preGrant(ObserverContext<MasterCoprocessorEnvironment> ctx,
2372    UserPermission userPermission, boolean mergeExistingPermissions) throws IOException {
2373    preGrantOrRevoke(getActiveUser(ctx), "grant", userPermission);
2374  }
2375
2376  @Override
2377  public void preRevoke(ObserverContext<MasterCoprocessorEnvironment> ctx,
2378    UserPermission userPermission) throws IOException {
2379    preGrantOrRevoke(getActiveUser(ctx), "revoke", userPermission);
2380  }
2381
2382  private void preGrantOrRevoke(User caller, String request, UserPermission userPermission)
2383    throws IOException {
2384    switch (userPermission.getPermission().scope) {
2385      case GLOBAL:
2386        accessChecker.requireGlobalPermission(caller, request, Action.ADMIN, "");
2387        break;
2388      case NAMESPACE:
2389        NamespacePermission namespacePerm = (NamespacePermission) userPermission.getPermission();
2390        accessChecker.requireNamespacePermission(caller, request, namespacePerm.getNamespace(),
2391          null, Action.ADMIN);
2392        break;
2393      case TABLE:
2394        TablePermission tablePerm = (TablePermission) userPermission.getPermission();
2395        accessChecker.requirePermission(caller, request, tablePerm.getTableName(),
2396          tablePerm.getFamily(), tablePerm.getQualifier(), null, Action.ADMIN);
2397        break;
2398      default:
2399    }
2400    if (!Superusers.isSuperUser(caller)) {
2401      accessChecker.performOnSuperuser(request, caller, userPermission.getUser());
2402    }
2403  }
2404
2405  @Override
2406  public void preGetUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
2407    String userName, String namespace, TableName tableName, byte[] family, byte[] qualifier)
2408    throws IOException {
2409    preGetUserPermissions(getActiveUser(ctx), userName, namespace, tableName, family, qualifier);
2410  }
2411
2412  private void preGetUserPermissions(User caller, String userName, String namespace,
2413    TableName tableName, byte[] family, byte[] qualifier) throws IOException {
2414    if (tableName != null) {
2415      accessChecker.requirePermission(caller, "getUserPermissions", tableName, family, qualifier,
2416        userName, Action.ADMIN);
2417    } else if (namespace != null) {
2418      accessChecker.requireNamespacePermission(caller, "getUserPermissions", namespace, userName,
2419        Action.ADMIN);
2420    } else {
2421      accessChecker.requirePermission(caller, "getUserPermissions", userName, Action.ADMIN);
2422    }
2423  }
2424
2425  @Override
2426  public void preHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
2427    String userName, List<Permission> permissions) throws IOException {
2428    preHasUserPermissions(getActiveUser(ctx), userName, permissions);
2429  }
2430
2431  private void preHasUserPermissions(User caller, String userName, List<Permission> permissions)
2432    throws IOException {
2433    String request = "hasUserPermissions";
2434    for (Permission permission : permissions) {
2435      if (!caller.getShortName().equals(userName)) {
2436        // User should have admin privilege if checking permission for other users
2437        if (permission instanceof TablePermission) {
2438          TablePermission tPerm = (TablePermission) permission;
2439          accessChecker.requirePermission(caller, request, tPerm.getTableName(), tPerm.getFamily(),
2440            tPerm.getQualifier(), userName, Action.ADMIN);
2441        } else if (permission instanceof NamespacePermission) {
2442          NamespacePermission nsPerm = (NamespacePermission) permission;
2443          accessChecker.requireNamespacePermission(caller, request, nsPerm.getNamespace(), userName,
2444            Action.ADMIN);
2445        } else {
2446          accessChecker.requirePermission(caller, request, userName, Action.ADMIN);
2447        }
2448      } else {
2449        // User don't need ADMIN privilege for self check.
2450        // Setting action as null in AuthResult to display empty action in audit log
2451        AuthResult result;
2452        if (permission instanceof TablePermission) {
2453          TablePermission tPerm = (TablePermission) permission;
2454          result = AuthResult.allow(request, "Self user validation allowed", caller, null,
2455            tPerm.getTableName(), tPerm.getFamily(), tPerm.getQualifier());
2456        } else if (permission instanceof NamespacePermission) {
2457          NamespacePermission nsPerm = (NamespacePermission) permission;
2458          result = AuthResult.allow(request, "Self user validation allowed", caller, null,
2459            nsPerm.getNamespace());
2460        } else {
2461          result = AuthResult.allow(request, "Self user validation allowed", caller, null, null,
2462            null, null);
2463        }
2464        AccessChecker.logResult(result);
2465      }
2466    }
2467  }
2468
2469  @Override
2470  public void preClearRegionBlockCache(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2471    throws IOException {
2472    accessChecker.requirePermission(getActiveUser(ctx), "clearRegionBlockCache", null,
2473      Permission.Action.ADMIN);
2474  }
2475
2476  @Override
2477  public void preUpdateRegionServerConfiguration(
2478    ObserverContext<RegionServerCoprocessorEnvironment> ctx, Configuration preReloadConf)
2479    throws IOException {
2480    accessChecker.requirePermission(getActiveUser(ctx), "updateConfiguration", null,
2481      Permission.Action.ADMIN);
2482  }
2483
2484  @Override
2485  public void preUpdateMasterConfiguration(ObserverContext<MasterCoprocessorEnvironment> ctx,
2486    Configuration preReloadConf) throws IOException {
2487    accessChecker.requirePermission(getActiveUser(ctx), "updateConfiguration", null,
2488      Permission.Action.ADMIN);
2489  }
2490}