001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.access;
019
020import java.io.IOException;
021import java.security.PrivilegedExceptionAction;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Map.Entry;
030import java.util.Optional;
031import java.util.Set;
032import java.util.TreeMap;
033import java.util.TreeSet;
034import java.util.stream.Collectors;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.ArrayBackedTag;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellUtil;
039import org.apache.hadoop.hbase.CompareOperator;
040import org.apache.hadoop.hbase.CompoundConfiguration;
041import org.apache.hadoop.hbase.CoprocessorEnvironment;
042import org.apache.hadoop.hbase.DoNotRetryIOException;
043import org.apache.hadoop.hbase.ExtendedCell;
044import org.apache.hadoop.hbase.ExtendedCellScanner;
045import org.apache.hadoop.hbase.HBaseInterfaceAudience;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.KeyValue.Type;
049import org.apache.hadoop.hbase.NamespaceDescriptor;
050import org.apache.hadoop.hbase.PrivateCellUtil;
051import org.apache.hadoop.hbase.ServerName;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.Tag;
054import org.apache.hadoop.hbase.client.Admin;
055import org.apache.hadoop.hbase.client.Append;
056import org.apache.hadoop.hbase.client.BalanceRequest;
057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
059import org.apache.hadoop.hbase.client.Delete;
060import org.apache.hadoop.hbase.client.Durability;
061import org.apache.hadoop.hbase.client.Get;
062import org.apache.hadoop.hbase.client.Increment;
063import org.apache.hadoop.hbase.client.MasterSwitchType;
064import org.apache.hadoop.hbase.client.Mutation;
065import org.apache.hadoop.hbase.client.Put;
066import org.apache.hadoop.hbase.client.Query;
067import org.apache.hadoop.hbase.client.RegionInfo;
068import org.apache.hadoop.hbase.client.Result;
069import org.apache.hadoop.hbase.client.Scan;
070import org.apache.hadoop.hbase.client.SnapshotDescription;
071import org.apache.hadoop.hbase.client.Table;
072import org.apache.hadoop.hbase.client.TableDescriptor;
073import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
074import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
075import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
076import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
077import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
078import org.apache.hadoop.hbase.coprocessor.HasMasterServices;
079import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
080import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
081import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
082import org.apache.hadoop.hbase.coprocessor.MasterObserver;
083import org.apache.hadoop.hbase.coprocessor.ObserverContext;
084import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
085import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
086import org.apache.hadoop.hbase.coprocessor.RegionObserver;
087import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
088import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
089import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
090import org.apache.hadoop.hbase.filter.ByteArrayComparable;
091import org.apache.hadoop.hbase.filter.Filter;
092import org.apache.hadoop.hbase.filter.FilterList;
093import org.apache.hadoop.hbase.io.hfile.HFile;
094import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
095import org.apache.hadoop.hbase.ipc.RpcServer;
096import org.apache.hadoop.hbase.master.MasterServices;
097import org.apache.hadoop.hbase.net.Address;
098import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings;
099import org.apache.hadoop.hbase.regionserver.BloomType;
100import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
101import org.apache.hadoop.hbase.regionserver.InternalScanner;
102import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
103import org.apache.hadoop.hbase.regionserver.Region;
104import org.apache.hadoop.hbase.regionserver.RegionScanner;
105import org.apache.hadoop.hbase.regionserver.RegionServerServices;
106import org.apache.hadoop.hbase.regionserver.ScanType;
107import org.apache.hadoop.hbase.regionserver.ScannerContext;
108import org.apache.hadoop.hbase.regionserver.Store;
109import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
110import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
111import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
112import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
113import org.apache.hadoop.hbase.replication.SyncReplicationState;
114import org.apache.hadoop.hbase.security.AccessDeniedException;
115import org.apache.hadoop.hbase.security.Superusers;
116import org.apache.hadoop.hbase.security.User;
117import org.apache.hadoop.hbase.security.UserProvider;
118import org.apache.hadoop.hbase.security.access.Permission.Action;
119import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
120import org.apache.hadoop.hbase.util.ByteRange;
121import org.apache.hadoop.hbase.util.Bytes;
122import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
123import org.apache.hadoop.hbase.util.Pair;
124import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
125import org.apache.hadoop.hbase.wal.WALEdit;
126import org.apache.yetus.audience.InterfaceAudience;
127import org.slf4j.Logger;
128import org.slf4j.LoggerFactory;
129
130import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
131import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
132import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
133import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
134import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker;
135import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
136import org.apache.hbase.thirdparty.com.google.protobuf.Message;
137import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
138import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
139import org.apache.hbase.thirdparty.com.google.protobuf.Service;
140
141import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
142import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.AccessControlService;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasPermissionRequest;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasPermissionResponse;
147
148/**
149 * Provides basic authorization checks for data access and administrative operations.
150 * <p>
151 * {@code AccessController} performs authorization checks for HBase operations based on:
152 * </p>
153 * <ul>
154 * <li>the identity of the user performing the operation</li>
155 * <li>the scope over which the operation is performed, in increasing specificity: global, table,
156 * column family, or qualifier</li>
157 * <li>the type of action being performed (as mapped to {@link Permission.Action} values)</li>
158 * </ul>
159 * <p>
160 * If the authorization check fails, an {@link AccessDeniedException} will be thrown for the
161 * operation.
162 * </p>
163 * <p>
164 * To perform authorization checks, {@code AccessController} relies on the RpcServerEngine being
165 * loaded to provide the user identities for remote requests.
166 * </p>
167 * <p>
168 * The access control lists used for authorization can be manipulated via the exposed
169 * {@link AccessControlService} Interface implementation, and the associated {@code grant},
170 * {@code revoke}, and {@code user_permission} HBase shell commands.
171 * </p>
172 */
173@CoreCoprocessor
174@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
175public class AccessController implements MasterCoprocessor, RegionCoprocessor,
176  RegionServerCoprocessor, AccessControlService.Interface, MasterObserver, RegionObserver,
177  RegionServerObserver, EndpointObserver, BulkLoadObserver {
178  // TODO: encapsulate observer functions into separate class/sub-class.
179
180  private static final Logger LOG = LoggerFactory.getLogger(AccessController.class);
181
182  private static final Logger AUDITLOG =
183    LoggerFactory.getLogger("SecurityLogger." + AccessController.class.getName());
184  private static final String CHECK_COVERING_PERM = "check_covering_perm";
185  private static final String TAG_CHECK_PASSED = "tag_check_passed";
186  private static final byte[] TRUE = Bytes.toBytes(true);
187
188  private AccessChecker accessChecker;
189  private ZKPermissionWatcher zkPermissionWatcher;
190
191  /** flags if we are running on a region of the _acl_ table */
192  private boolean aclRegion = false;
193
194  /**
195   * defined only for Endpoint implementation, so it can have way to access region services
196   */
197  private RegionCoprocessorEnvironment regionEnv;
198
199  /** Mapping of scanner instances to the user who created them */
200  private Map<InternalScanner, String> scannerOwners = new MapMaker().weakKeys().makeMap();
201
202  private Map<TableName, List<UserPermission>> tableAcls;
203
204  /** Provider for mapping principal names to Users */
205  private UserProvider userProvider;
206
207  /**
208   * if we are active, usually false, only true if "hbase.security.authorization" has been set to
209   * true in site configuration
210   */
211  private boolean authorizationEnabled;
212
213  /** if we are able to support cell ACLs */
214  private boolean cellFeaturesEnabled;
215
216  /** if we should check EXEC permissions */
217  private boolean shouldCheckExecPermission;
218
219  /**
220   * if we should terminate access checks early as soon as table or CF grants allow access; pre-0.98
221   * compatible behavior
222   */
223  private boolean compatibleEarlyTermination;
224
225  /** if we have been successfully initialized */
226  private volatile boolean initialized = false;
227
228  /** if the ACL table is available, only relevant in the master */
229  private volatile boolean aclTabAvailable = false;
230
231  public static boolean isCellAuthorizationSupported(Configuration conf) {
232    return AccessChecker.isAuthorizationSupported(conf)
233      && (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
234  }
235
236  public Region getRegion() {
237    return regionEnv != null ? regionEnv.getRegion() : null;
238  }
239
240  public AuthManager getAuthManager() {
241    return accessChecker.getAuthManager();
242  }
243
244  private void initialize(RegionCoprocessorEnvironment e) throws IOException {
245    final Region region = e.getRegion();
246    Configuration conf = e.getConfiguration();
247    Map<byte[], ListMultimap<String, UserPermission>> tables = PermissionStorage.loadAll(region);
248    // For each table, write out the table's permissions to the respective
249    // znode for that table.
250    for (Map.Entry<byte[], ListMultimap<String, UserPermission>> t : tables.entrySet()) {
251      byte[] entry = t.getKey();
252      ListMultimap<String, UserPermission> perms = t.getValue();
253      byte[] serialized = PermissionStorage.writePermissionsAsBytes(perms, conf);
254      zkPermissionWatcher.writeToZookeeper(entry, serialized);
255    }
256    initialized = true;
257  }
258
259  /**
260   * Writes all table ACLs for the tables in the given Map up into ZooKeeper znodes. This is called
261   * to synchronize ACL changes following {@code _acl_} table updates.
262   */
263  private void updateACL(RegionCoprocessorEnvironment e, final Map<byte[], List<Cell>> familyMap) {
264    Set<byte[]> entries = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
265    for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
266      List<Cell> cells = f.getValue();
267      for (Cell cell : cells) {
268        if (CellUtil.matchingFamily(cell, PermissionStorage.ACL_LIST_FAMILY)) {
269          entries.add(CellUtil.cloneRow(cell));
270        }
271      }
272    }
273    Configuration conf = regionEnv.getConfiguration();
274    byte[] currentEntry = null;
275    // TODO: Here we are already on the ACL region. (And it is single
276    // region) We can even just get the region from the env and do get
277    // directly. The short circuit connection would avoid the RPC overhead
278    // so no socket communication, req write/read .. But we have the PB
279    // to and fro conversion overhead. get req is converted to PB req
280    // and results are converted to PB results 1st and then to POJOs
281    // again. We could have avoided such at least in ACL table context..
282    try (Table t = e.getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
283      for (byte[] entry : entries) {
284        currentEntry = entry;
285        ListMultimap<String, UserPermission> perms =
286          PermissionStorage.getPermissions(conf, entry, t, null, null, null, false);
287        byte[] serialized = PermissionStorage.writePermissionsAsBytes(perms, conf);
288        zkPermissionWatcher.writeToZookeeper(entry, serialized);
289      }
290    } catch (IOException ex) {
291      LOG.error("Failed updating permissions mirror for '"
292        + (currentEntry == null ? "null" : Bytes.toString(currentEntry)) + "'", ex);
293    }
294  }
295
296  /**
297   * Check the current user for authorization to perform a specific action against the given set of
298   * row data.
299   * @param opType   the operation type
300   * @param user     the user
301   * @param e        the coprocessor environment
302   * @param families the map of column families to qualifiers present in the request
303   * @param actions  the desired actions
304   * @return an authorization result
305   */
306  private AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
307    Map<byte[], ? extends Collection<?>> families, Action... actions) {
308    AuthResult result = null;
309    for (Action action : actions) {
310      result = accessChecker.permissionGranted(opType.toString(), user, action,
311        e.getRegion().getRegionInfo().getTable(), families);
312      if (!result.isAllowed()) {
313        return result;
314      }
315    }
316    return result;
317  }
318
319  public void requireAccess(ObserverContext<?> ctx, String request, TableName tableName,
320    Action... permissions) throws IOException {
321    accessChecker.requireAccess(getActiveUser(ctx), request, tableName, permissions);
322  }
323
324  public void requirePermission(ObserverContext<?> ctx, String request, Action perm)
325    throws IOException {
326    accessChecker.requirePermission(getActiveUser(ctx), request, null, perm);
327  }
328
329  public void requireGlobalPermission(ObserverContext<?> ctx, String request, Action perm,
330    TableName tableName, Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
331    accessChecker.requireGlobalPermission(getActiveUser(ctx), request, perm, tableName, familyMap,
332      null);
333  }
334
335  public void requireGlobalPermission(ObserverContext<?> ctx, String request, Action perm,
336    String namespace) throws IOException {
337    accessChecker.requireGlobalPermission(getActiveUser(ctx), request, perm, namespace);
338  }
339
340  public void requireNamespacePermission(ObserverContext<?> ctx, String request, String namespace,
341    Action... permissions) throws IOException {
342    accessChecker.requireNamespacePermission(getActiveUser(ctx), request, namespace, null,
343      permissions);
344  }
345
346  public void requireNamespacePermission(ObserverContext<?> ctx, String request, String namespace,
347    TableName tableName, Map<byte[], ? extends Collection<byte[]>> familyMap, Action... permissions)
348    throws IOException {
349    accessChecker.requireNamespacePermission(getActiveUser(ctx), request, namespace, tableName,
350      familyMap, permissions);
351  }
352
353  public void requirePermission(ObserverContext<?> ctx, String request, TableName tableName,
354    byte[] family, byte[] qualifier, Action... permissions) throws IOException {
355    accessChecker.requirePermission(getActiveUser(ctx), request, tableName, family, qualifier, null,
356      permissions);
357  }
358
359  public void requireTablePermission(ObserverContext<?> ctx, String request, TableName tableName,
360    byte[] family, byte[] qualifier, Action... permissions) throws IOException {
361    accessChecker.requireTablePermission(getActiveUser(ctx), request, tableName, family, qualifier,
362      permissions);
363  }
364
365  public void checkLockPermissions(ObserverContext<?> ctx, String namespace, TableName tableName,
366    RegionInfo[] regionInfos, String reason) throws IOException {
367    accessChecker.checkLockPermissions(getActiveUser(ctx), namespace, tableName, regionInfos,
368      reason);
369  }
370
371  /**
372   * Returns <code>true</code> if the current user is allowed the given action over at least one of
373   * the column qualifiers in the given column families.
374   */
375  private boolean hasFamilyQualifierPermission(User user, Action perm,
376    RegionCoprocessorEnvironment env, Map<byte[], ? extends Collection<byte[]>> familyMap)
377    throws IOException {
378    RegionInfo hri = env.getRegion().getRegionInfo();
379    TableName tableName = hri.getTable();
380
381    if (user == null) {
382      return false;
383    }
384
385    if (familyMap != null && familyMap.size() > 0) {
386      // at least one family must be allowed
387      for (Map.Entry<byte[], ? extends Collection<byte[]>> family : familyMap.entrySet()) {
388        if (family.getValue() != null && !family.getValue().isEmpty()) {
389          for (byte[] qualifier : family.getValue()) {
390            if (
391              getAuthManager().authorizeUserTable(user, tableName, family.getKey(), qualifier, perm)
392            ) {
393              return true;
394            }
395          }
396        } else {
397          if (getAuthManager().authorizeUserFamily(user, tableName, family.getKey(), perm)) {
398            return true;
399          }
400        }
401      }
402    } else if (LOG.isDebugEnabled()) {
403      LOG.debug("Empty family map passed for permission check");
404    }
405
406    return false;
407  }
408
409  private enum OpType {
410    GET("get"),
411    EXISTS("exists"),
412    SCAN("scan"),
413    PUT("put"),
414    DELETE("delete"),
415    CHECK_AND_PUT("checkAndPut"),
416    CHECK_AND_DELETE("checkAndDelete"),
417    APPEND("append"),
418    INCREMENT("increment");
419
420    private String type;
421
422    private OpType(String type) {
423      this.type = type;
424    }
425
426    @Override
427    public String toString() {
428      return type;
429    }
430  }
431
432  /**
433   * Determine if cell ACLs covered by the operation grant access. This is expensive.
434   * @return false if cell ACLs failed to grant access, true otherwise
435   */
436  private boolean checkCoveringPermission(User user, OpType request, RegionCoprocessorEnvironment e,
437    byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
438    throws IOException {
439    if (!cellFeaturesEnabled) {
440      return false;
441    }
442    long cellGrants = 0;
443    long latestCellTs = 0;
444    Get get = new Get(row);
445    // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
446    // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
447    // version. We have to get every cell version and check its TS against the TS asked for in
448    // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
449    // consider only one such passing cell. In case of Delete we have to consider all the cell
450    // versions under this passing version. When Delete Mutation contains columns which are a
451    // version delete just consider only one version for those column cells.
452    boolean considerCellTs = (request == OpType.PUT || request == OpType.DELETE);
453    if (considerCellTs) {
454      get.readAllVersions();
455    } else {
456      get.readVersions(1);
457    }
458    boolean diffCellTsFromOpTs = false;
459    for (Map.Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
460      byte[] col = entry.getKey();
461      // TODO: HBASE-7114 could possibly unify the collection type in family
462      // maps so we would not need to do this
463      if (entry.getValue() instanceof Set) {
464        Set<byte[]> set = (Set<byte[]>) entry.getValue();
465        if (set == null || set.isEmpty()) {
466          get.addFamily(col);
467        } else {
468          for (byte[] qual : set) {
469            get.addColumn(col, qual);
470          }
471        }
472      } else if (entry.getValue() instanceof List) {
473        List<ExtendedCell> list = (List<ExtendedCell>) entry.getValue();
474        if (list == null || list.isEmpty()) {
475          get.addFamily(col);
476        } else {
477          // In case of family delete, a Cell will be added into the list with Qualifier as null.
478          for (ExtendedCell cell : list) {
479            if (
480              cell.getQualifierLength() == 0 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
481                || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())
482            ) {
483              get.addFamily(col);
484            } else {
485              get.addColumn(col, CellUtil.cloneQualifier(cell));
486            }
487            if (considerCellTs) {
488              long cellTs = cell.getTimestamp();
489              latestCellTs = Math.max(latestCellTs, cellTs);
490              diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
491            }
492          }
493        }
494      } else if (entry.getValue() == null) {
495        get.addFamily(col);
496      } else {
497        throw new RuntimeException(
498          "Unhandled collection type " + entry.getValue().getClass().getName());
499      }
500    }
501    // We want to avoid looking into the future. So, if the cells of the
502    // operation specify a timestamp, or the operation itself specifies a
503    // timestamp, then we use the maximum ts found. Otherwise, we bound
504    // the Get to the current server time. We add 1 to the timerange since
505    // the upper bound of a timerange is exclusive yet we need to examine
506    // any cells found there inclusively.
507    long latestTs = Math.max(opTs, latestCellTs);
508    if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
509      latestTs = EnvironmentEdgeManager.currentTime();
510    }
511    get.setTimeRange(0, latestTs + 1);
512    // In case of Put operation we set to read all versions. This was done to consider the case
513    // where columns are added with TS other than the Mutation TS. But normally this wont be the
514    // case with Put. There no need to get all versions but get latest version only.
515    if (!diffCellTsFromOpTs && request == OpType.PUT) {
516      get.readVersions(1);
517    }
518    if (LOG.isTraceEnabled()) {
519      LOG.trace("Scanning for cells with " + get);
520    }
521    // This Map is identical to familyMap. The key is a BR rather than byte[].
522    // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
523    // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
524    Map<ByteRange, List<Cell>> familyMap1 = new HashMap<>();
525    for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
526      if (entry.getValue() instanceof List) {
527        familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
528      }
529    }
530    RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
531    List<Cell> cells = Lists.newArrayList();
532    Cell prevCell = null;
533    ByteRange curFam = new SimpleMutableByteRange();
534    boolean curColAllVersions = (request == OpType.DELETE);
535    long curColCheckTs = opTs;
536    boolean foundColumn = false;
537    try {
538      boolean more = false;
539      ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(1).build();
540
541      do {
542        cells.clear();
543        // scan with limit as 1 to hold down memory use on wide rows
544        more = scanner.next(cells, scannerContext);
545        for (Cell cell : cells) {
546          if (LOG.isTraceEnabled()) {
547            LOG.trace("Found cell " + cell);
548          }
549          boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
550          if (colChange) foundColumn = false;
551          prevCell = cell;
552          if (!curColAllVersions && foundColumn) {
553            continue;
554          }
555          if (colChange && considerCellTs) {
556            curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
557            List<Cell> cols = familyMap1.get(curFam);
558            for (Cell col : cols) {
559              // null/empty qualifier is used to denote a Family delete. The TS and delete type
560              // associated with this is applicable for all columns within the family. That is
561              // why the below (col.getQualifierLength() == 0) check.
562              if (
563                (col.getQualifierLength() == 0 && request == OpType.DELETE)
564                  || CellUtil.matchingQualifier(cell, col)
565              ) {
566                byte type = PrivateCellUtil.getTypeByte(col);
567                if (considerCellTs) {
568                  curColCheckTs = col.getTimestamp();
569                }
570                // For a Delete op we pass allVersions as true. When a Delete Mutation contains
571                // a version delete for a column no need to check all the covering cells within
572                // that column. Check all versions when Type is DeleteColumn or DeleteFamily
573                // One version delete types are Delete/DeleteFamilyVersion
574                curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
575                  || (KeyValue.Type.DeleteFamily.getCode() == type);
576                break;
577              }
578            }
579          }
580          if (cell.getTimestamp() > curColCheckTs) {
581            // Just ignore this cell. This is not a covering cell.
582            continue;
583          }
584          foundColumn = true;
585          for (Action action : actions) {
586            // Are there permissions for this user for the cell?
587            if (!getAuthManager().authorizeCell(user, getTableName(e), cell, action)) {
588              // We can stop if the cell ACL denies access
589              return false;
590            }
591          }
592          cellGrants++;
593        }
594      } while (more);
595    } catch (AccessDeniedException ex) {
596      throw ex;
597    } catch (IOException ex) {
598      LOG.error("Exception while getting cells to calculate covering permission", ex);
599    } finally {
600      scanner.close();
601    }
602    // We should not authorize unless we have found one or more cell ACLs that
603    // grant access. This code is used to check for additional permissions
604    // after no table or CF grants are found.
605    return cellGrants > 0;
606  }
607
608  private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
609    // Iterate over the entries in the familyMap, replacing the cells therein
610    // with new cells including the ACL data
611    for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
612      List<Cell> newCells = Lists.newArrayList();
613      for (Cell c : e.getValue()) {
614        assert c instanceof ExtendedCell;
615        ExtendedCell cell = (ExtendedCell) c;
616        // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
617        List<Tag> tags = new ArrayList<>();
618        tags.add(new ArrayBackedTag(PermissionStorage.ACL_TAG_TYPE, perms));
619        Iterator<Tag> tagIterator = PrivateCellUtil.tagsIterator(cell);
620        while (tagIterator.hasNext()) {
621          tags.add(tagIterator.next());
622        }
623        newCells.add(PrivateCellUtil.createCell(cell, tags));
624      }
625      // This is supposed to be safe, won't CME
626      e.setValue(newCells);
627    }
628  }
629
630  // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
631  // type is reserved and should not be explicitly set by user.
632  private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
633    // No need to check if we're not going to throw
634    if (!authorizationEnabled) {
635      m.setAttribute(TAG_CHECK_PASSED, TRUE);
636      return;
637    }
638    // Superusers are allowed to store cells unconditionally.
639    if (Superusers.isSuperUser(user)) {
640      m.setAttribute(TAG_CHECK_PASSED, TRUE);
641      return;
642    }
643    // We already checked (prePut vs preBatchMutation)
644    if (m.getAttribute(TAG_CHECK_PASSED) != null) {
645      return;
646    }
647    for (ExtendedCellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
648      Iterator<Tag> tagsItr = PrivateCellUtil.tagsIterator(cellScanner.current());
649      while (tagsItr.hasNext()) {
650        if (tagsItr.next().getType() == PermissionStorage.ACL_TAG_TYPE) {
651          throw new AccessDeniedException("Mutation contains cell with reserved type tag");
652        }
653      }
654    }
655    m.setAttribute(TAG_CHECK_PASSED, TRUE);
656  }
657
658  /* ---- MasterObserver implementation ---- */
659  @Override
660  public void start(CoprocessorEnvironment env) throws IOException {
661    CompoundConfiguration conf = new CompoundConfiguration();
662    conf.add(env.getConfiguration());
663
664    authorizationEnabled = AccessChecker.isAuthorizationSupported(conf);
665    if (!authorizationEnabled) {
666      LOG.warn("AccessController has been loaded with authorization checks DISABLED!");
667    }
668
669    shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
670      AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
671
672    cellFeaturesEnabled = (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
673    if (!cellFeaturesEnabled) {
674      LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
675        + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
676        + " accordingly.");
677    }
678
679    if (env instanceof MasterCoprocessorEnvironment) {
680      // if running on HMaster
681      MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
682      if (mEnv instanceof HasMasterServices) {
683        MasterServices masterServices = ((HasMasterServices) mEnv).getMasterServices();
684        zkPermissionWatcher = masterServices.getZKPermissionWatcher();
685        accessChecker = masterServices.getAccessChecker();
686      }
687    } else if (env instanceof RegionServerCoprocessorEnvironment) {
688      RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
689      if (rsEnv instanceof HasRegionServerServices) {
690        RegionServerServices rsServices =
691          ((HasRegionServerServices) rsEnv).getRegionServerServices();
692        zkPermissionWatcher = rsServices.getZKPermissionWatcher();
693        accessChecker = rsServices.getAccessChecker();
694      }
695    } else if (env instanceof RegionCoprocessorEnvironment) {
696      // if running at region
697      regionEnv = (RegionCoprocessorEnvironment) env;
698      conf.addBytesMap(regionEnv.getRegion().getTableDescriptor().getValues());
699      compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
700        AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
701      if (regionEnv instanceof HasRegionServerServices) {
702        RegionServerServices rsServices =
703          ((HasRegionServerServices) regionEnv).getRegionServerServices();
704        zkPermissionWatcher = rsServices.getZKPermissionWatcher();
705        accessChecker = rsServices.getAccessChecker();
706      }
707    }
708
709    Preconditions.checkState(zkPermissionWatcher != null, "ZKPermissionWatcher is null");
710    Preconditions.checkState(accessChecker != null, "AccessChecker is null");
711
712    // set the user-provider.
713    this.userProvider = UserProvider.instantiate(env.getConfiguration());
714    tableAcls = new MapMaker().weakValues().makeMap();
715  }
716
717  @Override
718  public void stop(CoprocessorEnvironment env) {
719  }
720
721  /*********************************** Observer/Service Getters ***********************************/
722  @Override
723  public Optional<RegionObserver> getRegionObserver() {
724    return Optional.of(this);
725  }
726
727  @Override
728  public Optional<MasterObserver> getMasterObserver() {
729    return Optional.of(this);
730  }
731
732  @Override
733  public Optional<EndpointObserver> getEndpointObserver() {
734    return Optional.of(this);
735  }
736
737  @Override
738  public Optional<BulkLoadObserver> getBulkLoadObserver() {
739    return Optional.of(this);
740  }
741
742  @Override
743  public Optional<RegionServerObserver> getRegionServerObserver() {
744    return Optional.of(this);
745  }
746
747  @Override
748  public Iterable<Service> getServices() {
749    return Collections
750      .singleton(AccessControlProtos.AccessControlService.newReflectiveService(this));
751  }
752
753  /*********************************** Observer implementations ***********************************/
754
755  @Override
756  public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c, TableDescriptor desc,
757    RegionInfo[] regions) throws IOException {
758    Set<byte[]> families = desc.getColumnFamilyNames();
759    Map<byte[], Set<byte[]>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
760    for (byte[] family : families) {
761      familyMap.put(family, null);
762    }
763    requireNamespacePermission(c, "createTable", desc.getTableName().getNamespaceAsString(),
764      desc.getTableName(), familyMap, Action.ADMIN, Action.CREATE);
765  }
766
767  @Override
768  public void postCompletedCreateTableAction(final ObserverContext<MasterCoprocessorEnvironment> c,
769    final TableDescriptor desc, final RegionInfo[] regions) throws IOException {
770    // When AC is used, it should be configured as the 1st CP.
771    // In Master, the table operations like create, are handled by a Thread pool but the max size
772    // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
773    // sequentially only.
774    // Related code in HMaster#startServiceThreads
775    // {code}
776    // // We depend on there being only one instance of this executor running
777    // // at a time. To do concurrency, would need fencing of enable/disable of
778    // // tables.
779    // this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
780    // {code}
781    // In future if we change this pool to have more threads, then there is a chance for thread,
782    // creating acl table, getting delayed and by that time another table creation got over and
783    // this hook is getting called. In such a case, we will need a wait logic here which will
784    // wait till the acl table is created.
785    if (PermissionStorage.isAclTable(desc)) {
786      this.aclTabAvailable = true;
787    } else {
788      if (!aclTabAvailable) {
789        LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
790          + PermissionStorage.ACL_TABLE_NAME + " is not yet created. " + getClass().getSimpleName()
791          + " should be configured as the first Coprocessor");
792      } else {
793        String owner = getActiveUser(c).getShortName();
794        final UserPermission userPermission = new UserPermission(owner,
795          Permission.newBuilder(desc.getTableName()).withActions(Action.values()).build());
796        // switch to the real hbase master user for doing the RPC on the ACL table
797        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
798          @Override
799          public Void run() throws Exception {
800            try (Table table =
801              c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
802              PermissionStorage.addUserPermission(c.getEnvironment().getConfiguration(),
803                userPermission, table);
804            }
805            return null;
806          }
807        });
808      }
809    }
810  }
811
812  @Override
813  public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
814    throws IOException {
815    requirePermission(c, "deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
816  }
817
818  @Override
819  public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
820    final TableName tableName) throws IOException {
821    final Configuration conf = c.getEnvironment().getConfiguration();
822    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
823      @Override
824      public Void run() throws Exception {
825        try (Table table =
826          c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
827          PermissionStorage.removeTablePermissions(conf, tableName, table);
828        }
829        return null;
830      }
831    });
832    zkPermissionWatcher.deleteTableACLNode(tableName);
833  }
834
835  @Override
836  public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
837    final TableName tableName) throws IOException {
838    requirePermission(c, "truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
839
840    final Configuration conf = c.getEnvironment().getConfiguration();
841    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
842      @Override
843      public Void run() throws Exception {
844        List<UserPermission> acls =
845          PermissionStorage.getUserTablePermissions(conf, tableName, null, null, null, false);
846        if (acls != null) {
847          tableAcls.put(tableName, acls);
848        }
849        return null;
850      }
851    });
852  }
853
854  @Override
855  public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
856    final TableName tableName) throws IOException {
857    final Configuration conf = ctx.getEnvironment().getConfiguration();
858    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
859      @Override
860      public Void run() throws Exception {
861        List<UserPermission> perms = tableAcls.get(tableName);
862        if (perms != null) {
863          for (UserPermission perm : perms) {
864            try (Table table =
865              ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
866              PermissionStorage.addUserPermission(conf, perm, table);
867            }
868          }
869        }
870        tableAcls.remove(tableName);
871        return null;
872      }
873    });
874  }
875
876  @Override
877  public TableDescriptor preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
878    TableName tableName, TableDescriptor currentDesc, TableDescriptor newDesc) throws IOException {
879    // TODO: potentially check if this is a add/modify/delete column operation
880    requirePermission(c, "modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
881    return newDesc;
882  }
883
884  @Override
885  public String preModifyTableStoreFileTracker(ObserverContext<MasterCoprocessorEnvironment> c,
886    TableName tableName, String dstSFT) throws IOException {
887    requirePermission(c, "modifyTableStoreFileTracker", tableName, null, null, Action.ADMIN,
888      Action.CREATE);
889    return dstSFT;
890  }
891
892  @Override
893  public String preModifyColumnFamilyStoreFileTracker(
894    ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName, byte[] family,
895    String dstSFT) throws IOException {
896    requirePermission(c, "modifyColumnFamilyStoreFileTracker", tableName, family, null,
897      Action.ADMIN, Action.CREATE);
898    return dstSFT;
899  }
900
901  @Override
902  public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
903    TableDescriptor oldDesc, TableDescriptor currentDesc) throws IOException {
904    final Configuration conf = c.getEnvironment().getConfiguration();
905    // default the table owner to current user, if not specified.
906    final String owner = getActiveUser(c).getShortName();
907    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
908      @Override
909      public Void run() throws Exception {
910        UserPermission userperm = new UserPermission(owner,
911          Permission.newBuilder(currentDesc.getTableName()).withActions(Action.values()).build());
912        try (Table table =
913          c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
914          PermissionStorage.addUserPermission(conf, userperm, table);
915        }
916        return null;
917      }
918    });
919  }
920
921  @Override
922  public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
923    throws IOException {
924    requirePermission(c, "enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
925  }
926
927  @Override
928  public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
929    throws IOException {
930    if (Bytes.equals(tableName.getName(), PermissionStorage.ACL_GLOBAL_NAME)) {
931      // We have to unconditionally disallow disable of the ACL table when we are installed,
932      // even if not enforcing authorizations. We are still allowing grants and revocations,
933      // checking permissions and logging audit messages, etc. If the ACL table is not
934      // available we will fail random actions all over the place.
935      throw new AccessDeniedException("Not allowed to disable " + PermissionStorage.ACL_TABLE_NAME
936        + " table with AccessController installed");
937    }
938    requirePermission(c, "disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
939  }
940
941  @Override
942  public void preAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx,
943    final long procId) throws IOException {
944    requirePermission(ctx, "abortProcedure", Action.ADMIN);
945  }
946
947  @Override
948  public void postAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx)
949    throws IOException {
950    // There is nothing to do at this time after the procedure abort request was sent.
951  }
952
953  @Override
954  public void preGetProcedures(ObserverContext<MasterCoprocessorEnvironment> ctx)
955    throws IOException {
956    requirePermission(ctx, "getProcedure", Action.ADMIN);
957  }
958
959  @Override
960  public void preGetLocks(ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
961    User user = getActiveUser(ctx);
962    accessChecker.requirePermission(user, "getLocks", null, Action.ADMIN);
963  }
964
965  @Override
966  public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo region,
967    ServerName srcServer, ServerName destServer) throws IOException {
968    requirePermission(c, "move", region.getTable(), null, null, Action.ADMIN);
969  }
970
971  @Override
972  public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo regionInfo)
973    throws IOException {
974    requirePermission(c, "assign", regionInfo.getTable(), null, null, Action.ADMIN);
975  }
976
977  @Override
978  public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo regionInfo)
979    throws IOException {
980    requirePermission(c, "unassign", regionInfo.getTable(), null, null, Action.ADMIN);
981  }
982
983  @Override
984  public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
985    RegionInfo regionInfo) throws IOException {
986    requirePermission(c, "regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
987  }
988
989  @Override
990  public void preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
991    final boolean newValue, final MasterSwitchType switchType) throws IOException {
992    requirePermission(ctx, "setSplitOrMergeEnabled", Action.ADMIN);
993  }
994
995  @Override
996  public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c, BalanceRequest request)
997    throws IOException {
998    requirePermission(c, "balance", Action.ADMIN);
999  }
1000
1001  @Override
1002  public void preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c, boolean newValue)
1003    throws IOException {
1004    requirePermission(c, "balanceSwitch", Action.ADMIN);
1005  }
1006
1007  @Override
1008  public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c) throws IOException {
1009    requirePermission(c, "shutdown", Action.ADMIN);
1010  }
1011
1012  @Override
1013  public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c) throws IOException {
1014    requirePermission(c, "stopMaster", Action.ADMIN);
1015  }
1016
1017  @Override
1018  public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1019    throws IOException {
1020    try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) {
1021      if (!admin.tableExists(PermissionStorage.ACL_TABLE_NAME)) {
1022        createACLTable(admin);
1023      } else {
1024        this.aclTabAvailable = true;
1025      }
1026    }
1027  }
1028
1029  /**
1030   * Create the ACL table
1031   */
1032  private static void createACLTable(Admin admin) throws IOException {
1033    /** Table descriptor for ACL table */
1034    ColumnFamilyDescriptor cfd =
1035      ColumnFamilyDescriptorBuilder.newBuilder(PermissionStorage.ACL_LIST_FAMILY).setMaxVersions(1)
1036        .setInMemory(true).setBlockCacheEnabled(true).setBlocksize(8 * 1024)
1037        .setBloomFilterType(BloomType.NONE).setScope(HConstants.REPLICATION_SCOPE_LOCAL).build();
1038    TableDescriptor td = TableDescriptorBuilder.newBuilder(PermissionStorage.ACL_TABLE_NAME)
1039      .setColumnFamily(cfd).build();
1040    admin.createTable(td);
1041  }
1042
1043  @Override
1044  public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1045    final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor) throws IOException {
1046    // Move this ACL check to SnapshotManager#checkPermissions as part of AC deprecation.
1047    requirePermission(ctx, "snapshot " + snapshot.getName(), hTableDescriptor.getTableName(), null,
1048      null, Permission.Action.ADMIN);
1049  }
1050
1051  @Override
1052  public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1053    final SnapshotDescription snapshot) throws IOException {
1054    User user = getActiveUser(ctx);
1055    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1056      // list it, if user is the owner of snapshot
1057      AuthResult result = AuthResult.allow("listSnapshot " + snapshot.getName(),
1058        "Snapshot owner check allowed", user, null, null, null);
1059      AccessChecker.logResult(result);
1060    } else {
1061      accessChecker.requirePermission(user, "listSnapshot " + snapshot.getName(), null,
1062        Action.ADMIN);
1063    }
1064  }
1065
1066  @Override
1067  public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1068    final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor) throws IOException {
1069    User user = getActiveUser(ctx);
1070    if (
1071      SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)
1072        && hTableDescriptor.getTableName().getNameAsString().equals(snapshot.getTableNameAsString())
1073    ) {
1074      // Snapshot owner is allowed to create a table with the same name as the snapshot he took
1075      AuthResult result = AuthResult.allow("cloneSnapshot " + snapshot.getName(),
1076        "Snapshot owner check allowed", user, null, hTableDescriptor.getTableName(), null);
1077      AccessChecker.logResult(result);
1078    } else if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1079      requireNamespacePermission(ctx, "cloneSnapshot",
1080        hTableDescriptor.getTableName().getNamespaceAsString(), Action.ADMIN);
1081    } else {
1082      accessChecker.requirePermission(user, "cloneSnapshot " + snapshot.getName(), null,
1083        Action.ADMIN);
1084    }
1085  }
1086
1087  @Override
1088  public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1089    final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor) throws IOException {
1090    User user = getActiveUser(ctx);
1091    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1092      accessChecker.requirePermission(user, "restoreSnapshot " + snapshot.getName(),
1093        hTableDescriptor.getTableName(), null, null, null, Permission.Action.ADMIN);
1094    } else {
1095      accessChecker.requirePermission(user, "restoreSnapshot " + snapshot.getName(), null,
1096        Action.ADMIN);
1097    }
1098  }
1099
1100  @Override
1101  public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1102    final SnapshotDescription snapshot) throws IOException {
1103    User user = getActiveUser(ctx);
1104    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1105      // Snapshot owner is allowed to delete the snapshot
1106      AuthResult result = AuthResult.allow("deleteSnapshot " + snapshot.getName(),
1107        "Snapshot owner check allowed", user, null, null, null);
1108      AccessChecker.logResult(result);
1109    } else {
1110      accessChecker.requirePermission(user, "deleteSnapshot " + snapshot.getName(), null,
1111        Action.ADMIN);
1112    }
1113  }
1114
1115  @Override
1116  public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1117    NamespaceDescriptor ns) throws IOException {
1118    requireGlobalPermission(ctx, "createNamespace", Action.ADMIN, ns.getName());
1119  }
1120
1121  @Override
1122  public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1123    String namespace) throws IOException {
1124    requireGlobalPermission(ctx, "deleteNamespace", Action.ADMIN, namespace);
1125  }
1126
1127  @Override
1128  public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1129    final String namespace) throws IOException {
1130    final Configuration conf = ctx.getEnvironment().getConfiguration();
1131    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1132      @Override
1133      public Void run() throws Exception {
1134        try (Table table =
1135          ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
1136          PermissionStorage.removeNamespacePermissions(conf, namespace, table);
1137        }
1138        return null;
1139      }
1140    });
1141    zkPermissionWatcher.deleteNamespaceACLNode(namespace);
1142    LOG.info(namespace + " entry deleted in " + PermissionStorage.ACL_TABLE_NAME + " table.");
1143  }
1144
1145  @Override
1146  public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1147    NamespaceDescriptor currentNsDesc, NamespaceDescriptor newNsDesc) throws IOException {
1148    // We require only global permission so that
1149    // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1150    requireGlobalPermission(ctx, "modifyNamespace", Action.ADMIN, newNsDesc.getName());
1151  }
1152
1153  @Override
1154  public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
1155    String namespace) throws IOException {
1156    requireNamespacePermission(ctx, "getNamespaceDescriptor", namespace, Action.ADMIN);
1157  }
1158
1159  @Override
1160  public void postListNamespaces(ObserverContext<MasterCoprocessorEnvironment> ctx,
1161    List<String> namespaces) throws IOException {
1162    /* always allow namespace listing */
1163  }
1164
1165  @Override
1166  public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1167    List<NamespaceDescriptor> descriptors) throws IOException {
1168    // Retains only those which passes authorization checks, as the checks weren't done as part
1169    // of preGetTableDescriptors.
1170    Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1171    User user = getActiveUser(ctx);
1172    while (itr.hasNext()) {
1173      NamespaceDescriptor desc = itr.next();
1174      try {
1175        accessChecker.requireNamespacePermission(user, "listNamespaces", desc.getName(), null,
1176          Action.ADMIN);
1177      } catch (AccessDeniedException e) {
1178        itr.remove();
1179      }
1180    }
1181  }
1182
1183  @Override
1184  public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1185    final TableName tableName) throws IOException {
1186    // Move this ACL check to MasterFlushTableProcedureManager#checkPermissions as part of AC
1187    // deprecation.
1188    requirePermission(ctx, "flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1189  }
1190
1191  @Override
1192  public void preSplitRegion(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1193    final TableName tableName, final byte[] splitRow) throws IOException {
1194    requirePermission(ctx, "split", tableName, null, null, Action.ADMIN);
1195  }
1196
1197  @Override
1198  public void preClearDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
1199    throws IOException {
1200    requirePermission(ctx, "clearDeadServers", Action.ADMIN);
1201  }
1202
1203  @Override
1204  public void preDecommissionRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
1205    List<ServerName> servers, boolean offload) throws IOException {
1206    requirePermission(ctx, "decommissionRegionServers", Action.ADMIN);
1207  }
1208
1209  @Override
1210  public void preListDecommissionedRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
1211    throws IOException {
1212    requirePermission(ctx, "listDecommissionedRegionServers", Action.READ);
1213  }
1214
1215  @Override
1216  public void preRecommissionRegionServer(ObserverContext<MasterCoprocessorEnvironment> ctx,
1217    ServerName server, List<byte[]> encodedRegionNames) throws IOException {
1218    requirePermission(ctx, "recommissionRegionServers", Action.ADMIN);
1219  }
1220
1221  /* ---- RegionObserver implementation ---- */
1222
1223  @Override
1224  public void preOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c)
1225    throws IOException {
1226    RegionCoprocessorEnvironment env = c.getEnvironment();
1227    final Region region = env.getRegion();
1228    if (region == null) {
1229      LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1230    } else {
1231      RegionInfo regionInfo = region.getRegionInfo();
1232      if (regionInfo.getTable().isSystemTable()) {
1233        checkSystemOrSuperUser(getActiveUser(c));
1234      } else {
1235        requirePermission(c, "preOpen", Action.ADMIN);
1236      }
1237    }
1238  }
1239
1240  @Override
1241  public void postOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c) {
1242    RegionCoprocessorEnvironment env = c.getEnvironment();
1243    final Region region = env.getRegion();
1244    if (region == null) {
1245      LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1246      return;
1247    }
1248    if (PermissionStorage.isAclRegion(region)) {
1249      aclRegion = true;
1250      try {
1251        initialize(env);
1252      } catch (IOException ex) {
1253        // if we can't obtain permissions, it's better to fail
1254        // than perform checks incorrectly
1255        throw new RuntimeException("Failed to initialize permissions cache", ex);
1256      }
1257    } else {
1258      initialized = true;
1259    }
1260  }
1261
1262  @Override
1263  public void preFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c,
1264    FlushLifeCycleTracker tracker) throws IOException {
1265    requirePermission(c, "flush", getTableName(c.getEnvironment()), null, null, Action.ADMIN,
1266      Action.CREATE);
1267  }
1268
1269  @Override
1270  public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c,
1271    Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
1272    CompactionRequest request) throws IOException {
1273    requirePermission(c, "compact", getTableName(c.getEnvironment()), null, null, Action.ADMIN,
1274      Action.CREATE);
1275    return scanner;
1276  }
1277
1278  private void internalPreRead(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
1279    final Query query, OpType opType) throws IOException {
1280    Filter filter = query.getFilter();
1281    // Don't wrap an AccessControlFilter
1282    if (filter != null && filter instanceof AccessControlFilter) {
1283      return;
1284    }
1285    User user = getActiveUser(c);
1286    RegionCoprocessorEnvironment env = c.getEnvironment();
1287    Map<byte[], ? extends Collection<byte[]>> families = null;
1288    switch (opType) {
1289      case GET:
1290      case EXISTS:
1291        families = ((Get) query).getFamilyMap();
1292        break;
1293      case SCAN:
1294        families = ((Scan) query).getFamilyMap();
1295        break;
1296      default:
1297        throw new RuntimeException("Unhandled operation " + opType);
1298    }
1299    AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1300    Region region = getRegion(env);
1301    TableName table = getTableName(region);
1302    Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1303    for (ColumnFamilyDescriptor hcd : region.getTableDescriptor().getColumnFamilies()) {
1304      cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1305    }
1306    if (!authResult.isAllowed()) {
1307      if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1308        // Old behavior: Scan with only qualifier checks if we have partial
1309        // permission. Backwards compatible behavior is to throw an
1310        // AccessDeniedException immediately if there are no grants for table
1311        // or CF or CF+qual. Only proceed with an injected filter if there are
1312        // grants for qualifiers. Otherwise we will fall through below and log
1313        // the result and throw an ADE. We may end up checking qualifier
1314        // grants three times (permissionGranted above, here, and in the
1315        // filter) but that's the price of backwards compatibility.
1316        if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1317          authResult.setAllowed(true);
1318          authResult.setReason("Access allowed with filter");
1319          // Only wrap the filter if we are enforcing authorizations
1320          if (authorizationEnabled) {
1321            Filter ourFilter = new AccessControlFilter(getAuthManager(), user, table,
1322              AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY, cfVsMaxVersions);
1323            // wrap any existing filter
1324            if (filter != null) {
1325              ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1326                Lists.newArrayList(ourFilter, filter));
1327            }
1328            switch (opType) {
1329              case GET:
1330              case EXISTS:
1331                ((Get) query).setFilter(ourFilter);
1332                break;
1333              case SCAN:
1334                ((Scan) query).setFilter(ourFilter);
1335                break;
1336              default:
1337                throw new RuntimeException("Unhandled operation " + opType);
1338            }
1339          }
1340        }
1341      } else {
1342        // New behavior: Any access we might be granted is more fine-grained
1343        // than whole table or CF. Simply inject a filter and return what is
1344        // allowed. We will not throw an AccessDeniedException. This is a
1345        // behavioral change since 0.96.
1346        authResult.setAllowed(true);
1347        authResult.setReason("Access allowed with filter");
1348        // Only wrap the filter if we are enforcing authorizations
1349        if (authorizationEnabled) {
1350          Filter ourFilter = new AccessControlFilter(getAuthManager(), user, table,
1351            AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1352          // wrap any existing filter
1353          if (filter != null) {
1354            ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1355              Lists.newArrayList(ourFilter, filter));
1356          }
1357          switch (opType) {
1358            case GET:
1359            case EXISTS:
1360              ((Get) query).setFilter(ourFilter);
1361              break;
1362            case SCAN:
1363              ((Scan) query).setFilter(ourFilter);
1364              break;
1365            default:
1366              throw new RuntimeException("Unhandled operation " + opType);
1367          }
1368        }
1369      }
1370    }
1371
1372    AccessChecker.logResult(authResult);
1373    if (authorizationEnabled && !authResult.isAllowed()) {
1374      throw new AccessDeniedException("Insufficient permissions for user '"
1375        + (user != null ? user.getShortName() : "null") + "' (table=" + table + ", action=READ)");
1376    }
1377  }
1378
1379  @Override
1380  public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
1381    final Get get, final List<Cell> result) throws IOException {
1382    internalPreRead(c, get, OpType.GET);
1383  }
1384
1385  @Override
1386  public boolean preExists(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
1387    final Get get, final boolean exists) throws IOException {
1388    internalPreRead(c, get, OpType.EXISTS);
1389    return exists;
1390  }
1391
1392  @Override
1393  public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> c, final Put put,
1394    final WALEdit edit, final Durability durability) throws IOException {
1395    User user = getActiveUser(c);
1396    checkForReservedTagPresence(user, put);
1397
1398    // Require WRITE permission to the table, CF, or top visible value, if any.
1399    // NOTE: We don't need to check the permissions for any earlier Puts
1400    // because we treat the ACLs in each Put as timestamped like any other
1401    // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1402    // change the ACL of any previous Put. This allows simple evolution of
1403    // security policy over time without requiring expensive updates.
1404    RegionCoprocessorEnvironment env = c.getEnvironment();
1405    Map<byte[], ? extends Collection<Cell>> families = put.getFamilyCellMap();
1406    AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1407    AccessChecker.logResult(authResult);
1408    if (!authResult.isAllowed()) {
1409      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1410        put.setAttribute(CHECK_COVERING_PERM, TRUE);
1411      } else if (authorizationEnabled) {
1412        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1413      }
1414    }
1415
1416    // Add cell ACLs from the operation to the cells themselves
1417    byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1418    if (bytes != null) {
1419      if (cellFeaturesEnabled) {
1420        addCellPermissions(bytes, put.getFamilyCellMap());
1421      } else {
1422        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1423      }
1424    }
1425  }
1426
1427  @Override
1428  public void postPut(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
1429    final Put put, final WALEdit edit, final Durability durability) {
1430    if (aclRegion) {
1431      updateACL(c.getEnvironment(), put.getFamilyCellMap());
1432    }
1433  }
1434
1435  @Override
1436  public void preDelete(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
1437    final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
1438    // An ACL on a delete is useless, we shouldn't allow it
1439    if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1440      throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1441    }
1442    // Require WRITE permissions on all cells covered by the delete. Unlike
1443    // for Puts we need to check all visible prior versions, because a major
1444    // compaction could remove them. If the user doesn't have permission to
1445    // overwrite any of the visible versions ('visible' defined as not covered
1446    // by a tombstone already) then we have to disallow this operation.
1447    RegionCoprocessorEnvironment env = c.getEnvironment();
1448    Map<byte[], ? extends Collection<Cell>> families = delete.getFamilyCellMap();
1449    User user = getActiveUser(c);
1450    AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1451    AccessChecker.logResult(authResult);
1452    if (!authResult.isAllowed()) {
1453      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1454        delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1455      } else if (authorizationEnabled) {
1456        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1457      }
1458    }
1459  }
1460
1461  @Override
1462  public void preBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c,
1463    MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1464    if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1465      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1466      User user = getActiveUser(c);
1467      for (int i = 0; i < miniBatchOp.size(); i++) {
1468        Mutation m = miniBatchOp.getOperation(i);
1469        if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1470          // We have a failure with table, cf and q perm checks and now giving a chance for cell
1471          // perm check
1472          OpType opType;
1473          long timestamp;
1474          if (m instanceof Put) {
1475            checkForReservedTagPresence(user, m);
1476            opType = OpType.PUT;
1477            timestamp = m.getTimestamp();
1478          } else if (m instanceof Delete) {
1479            opType = OpType.DELETE;
1480            timestamp = m.getTimestamp();
1481          } else if (m instanceof Increment) {
1482            opType = OpType.INCREMENT;
1483            timestamp = ((Increment) m).getTimeRange().getMax();
1484          } else if (m instanceof Append) {
1485            opType = OpType.APPEND;
1486            timestamp = ((Append) m).getTimeRange().getMax();
1487          } else {
1488            // If the operation type is not Put/Delete/Increment/Append, do nothing
1489            continue;
1490          }
1491          AuthResult authResult = null;
1492          if (
1493            checkCoveringPermission(user, opType, c.getEnvironment(), m.getRow(),
1494              m.getFamilyCellMap(), timestamp, Action.WRITE)
1495          ) {
1496            authResult = AuthResult.allow(opType.toString(), "Covering cell set", user,
1497              Action.WRITE, table, m.getFamilyCellMap());
1498          } else {
1499            authResult = AuthResult.deny(opType.toString(), "Covering cell set", user, Action.WRITE,
1500              table, m.getFamilyCellMap());
1501          }
1502          AccessChecker.logResult(authResult);
1503          if (authorizationEnabled && !authResult.isAllowed()) {
1504            throw new AccessDeniedException(
1505              "Insufficient permissions " + authResult.toContextString());
1506          }
1507        }
1508      }
1509    }
1510  }
1511
1512  @Override
1513  public void postDelete(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
1514    final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
1515    if (aclRegion) {
1516      updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1517    }
1518  }
1519
1520  @Override
1521  public boolean preCheckAndPut(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
1522    final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op,
1523    final ByteArrayComparable comparator, final Put put, final boolean result) throws IOException {
1524    User user = getActiveUser(c);
1525    checkForReservedTagPresence(user, put);
1526
1527    // Require READ and WRITE permissions on the table, CF, and KV to update
1528    RegionCoprocessorEnvironment env = c.getEnvironment();
1529    Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1530    AuthResult authResult =
1531      permissionGranted(OpType.CHECK_AND_PUT, user, env, families, Action.READ, Action.WRITE);
1532    AccessChecker.logResult(authResult);
1533    if (!authResult.isAllowed()) {
1534      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1535        put.setAttribute(CHECK_COVERING_PERM, TRUE);
1536      } else if (authorizationEnabled) {
1537        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1538      }
1539    }
1540
1541    byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1542    if (bytes != null) {
1543      if (cellFeaturesEnabled) {
1544        addCellPermissions(bytes, put.getFamilyCellMap());
1545      } else {
1546        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1547      }
1548    }
1549    return result;
1550  }
1551
1552  @Override
1553  public boolean preCheckAndPutAfterRowLock(
1554    final ObserverContext<? extends RegionCoprocessorEnvironment> c, final byte[] row,
1555    final byte[] family, final byte[] qualifier, final CompareOperator opp,
1556    final ByteArrayComparable comparator, final Put put, final boolean result) throws IOException {
1557    if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1558      // We had failure with table, cf and q perm checks and now giving a chance for cell
1559      // perm check
1560      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1561      Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1562      AuthResult authResult = null;
1563      User user = getActiveUser(c);
1564      if (
1565        checkCoveringPermission(user, OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1566          HConstants.LATEST_TIMESTAMP, Action.READ)
1567      ) {
1568        authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set", user,
1569          Action.READ, table, families);
1570      } else {
1571        authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set", user,
1572          Action.READ, table, families);
1573      }
1574      AccessChecker.logResult(authResult);
1575      if (authorizationEnabled && !authResult.isAllowed()) {
1576        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1577      }
1578    }
1579    return result;
1580  }
1581
1582  @Override
1583  public boolean preCheckAndDelete(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
1584    final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op,
1585    final ByteArrayComparable comparator, final Delete delete, final boolean result)
1586    throws IOException {
1587    // An ACL on a delete is useless, we shouldn't allow it
1588    if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1589      throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " + delete.toString());
1590    }
1591    // Require READ and WRITE permissions on the table, CF, and the KV covered
1592    // by the delete
1593    RegionCoprocessorEnvironment env = c.getEnvironment();
1594    Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1595    User user = getActiveUser(c);
1596    AuthResult authResult =
1597      permissionGranted(OpType.CHECK_AND_DELETE, user, env, families, Action.READ, Action.WRITE);
1598    AccessChecker.logResult(authResult);
1599    if (!authResult.isAllowed()) {
1600      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1601        delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1602      } else if (authorizationEnabled) {
1603        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1604      }
1605    }
1606    return result;
1607  }
1608
1609  @Override
1610  public boolean preCheckAndDeleteAfterRowLock(
1611    final ObserverContext<? extends RegionCoprocessorEnvironment> c, final byte[] row,
1612    final byte[] family, final byte[] qualifier, final CompareOperator op,
1613    final ByteArrayComparable comparator, final Delete delete, final boolean result)
1614    throws IOException {
1615    if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1616      // We had failure with table, cf and q perm checks and now giving a chance for cell
1617      // perm check
1618      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1619      Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1620      AuthResult authResult = null;
1621      User user = getActiveUser(c);
1622      if (
1623        checkCoveringPermission(user, OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1624          HConstants.LATEST_TIMESTAMP, Action.READ)
1625      ) {
1626        authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set", user,
1627          Action.READ, table, families);
1628      } else {
1629        authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set", user,
1630          Action.READ, table, families);
1631      }
1632      AccessChecker.logResult(authResult);
1633      if (authorizationEnabled && !authResult.isAllowed()) {
1634        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1635      }
1636    }
1637    return result;
1638  }
1639
1640  @Override
1641  public Result preAppend(ObserverContext<? extends RegionCoprocessorEnvironment> c, Append append)
1642    throws IOException {
1643    User user = getActiveUser(c);
1644    checkForReservedTagPresence(user, append);
1645
1646    // Require WRITE permission to the table, CF, and the KV to be appended
1647    RegionCoprocessorEnvironment env = c.getEnvironment();
1648    Map<byte[], ? extends Collection<Cell>> families = append.getFamilyCellMap();
1649    AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1650    AccessChecker.logResult(authResult);
1651    if (!authResult.isAllowed()) {
1652      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1653        append.setAttribute(CHECK_COVERING_PERM, TRUE);
1654      } else if (authorizationEnabled) {
1655        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1656      }
1657    }
1658
1659    byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1660    if (bytes != null) {
1661      if (cellFeaturesEnabled) {
1662        addCellPermissions(bytes, append.getFamilyCellMap());
1663      } else {
1664        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1665      }
1666    }
1667
1668    return null;
1669  }
1670
1671  @Override
1672  public Result preIncrement(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
1673    final Increment increment) throws IOException {
1674    User user = getActiveUser(c);
1675    checkForReservedTagPresence(user, increment);
1676
1677    // Require WRITE permission to the table, CF, and the KV to be replaced by
1678    // the incremented value
1679    RegionCoprocessorEnvironment env = c.getEnvironment();
1680    Map<byte[], ? extends Collection<Cell>> families = increment.getFamilyCellMap();
1681    AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families, Action.WRITE);
1682    AccessChecker.logResult(authResult);
1683    if (!authResult.isAllowed()) {
1684      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1685        increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1686      } else if (authorizationEnabled) {
1687        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1688      }
1689    }
1690
1691    byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1692    if (bytes != null) {
1693      if (cellFeaturesEnabled) {
1694        addCellPermissions(bytes, increment.getFamilyCellMap());
1695      } else {
1696        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1697      }
1698    }
1699
1700    return null;
1701  }
1702
1703  @Override
1704  public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
1705    ObserverContext<? extends RegionCoprocessorEnvironment> ctx, Mutation mutation,
1706    List<Pair<Cell, Cell>> cellPairs) throws IOException {
1707    // If the HFile version is insufficient to persist tags, we won't have any
1708    // work to do here
1709    if (!cellFeaturesEnabled || mutation.getACL() == null) {
1710      return cellPairs;
1711    }
1712    return cellPairs.stream()
1713      .map(pair -> new Pair<>(pair.getFirst(),
1714        createNewCellWithTags(mutation, pair.getFirst(), pair.getSecond())))
1715      .collect(Collectors.toList());
1716  }
1717
1718  @Override
1719  public List<Pair<Cell, Cell>> postAppendBeforeWAL(
1720    ObserverContext<? extends RegionCoprocessorEnvironment> ctx, Mutation mutation,
1721    List<Pair<Cell, Cell>> cellPairs) throws IOException {
1722    // If the HFile version is insufficient to persist tags, we won't have any
1723    // work to do here
1724    if (!cellFeaturesEnabled || mutation.getACL() == null) {
1725      return cellPairs;
1726    }
1727    return cellPairs.stream()
1728      .map(pair -> new Pair<>(pair.getFirst(),
1729        createNewCellWithTags(mutation, pair.getFirst(), pair.getSecond())))
1730      .collect(Collectors.toList());
1731  }
1732
1733  private Cell createNewCellWithTags(Mutation mutation, Cell oldCell, Cell newCell) {
1734    // As Increment and Append operations have already copied the tags of oldCell to the newCell,
1735    // there is no need to rewrite them again. Just extract non-acl tags of newCell if we need to
1736    // add a new acl tag for the cell. Actually, oldCell is useless here.
1737    List<Tag> tags = Lists.newArrayList();
1738    ExtendedCell newExtendedCell = (ExtendedCell) newCell;
1739    if (newExtendedCell != null) {
1740      Iterator<Tag> tagIterator = PrivateCellUtil.tagsIterator(newExtendedCell);
1741      while (tagIterator.hasNext()) {
1742        Tag tag = tagIterator.next();
1743        if (tag.getType() != PermissionStorage.ACL_TAG_TYPE) {
1744          // Not an ACL tag, just carry it through
1745          if (LOG.isTraceEnabled()) {
1746            LOG.trace("Carrying forward tag from " + newCell + ": type " + tag.getType()
1747              + " length " + tag.getValueLength());
1748          }
1749          tags.add(tag);
1750        }
1751      }
1752    }
1753
1754    // We have checked the ACL tag of mutation is not null.
1755    // So that the tags could not be empty.
1756    tags.add(new ArrayBackedTag(PermissionStorage.ACL_TAG_TYPE, mutation.getACL()));
1757    return PrivateCellUtil.createCell(newExtendedCell, tags);
1758  }
1759
1760  @Override
1761  public void preScannerOpen(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
1762    final Scan scan) throws IOException {
1763    internalPreRead(c, scan, OpType.SCAN);
1764  }
1765
1766  @Override
1767  public RegionScanner postScannerOpen(
1768    final ObserverContext<? extends RegionCoprocessorEnvironment> c, final Scan scan,
1769    final RegionScanner s) throws IOException {
1770    User user = getActiveUser(c);
1771    if (user != null && user.getShortName() != null) {
1772      // store reference to scanner owner for later checks
1773      scannerOwners.put(s, user.getShortName());
1774    }
1775    return s;
1776  }
1777
1778  @Override
1779  public boolean preScannerNext(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
1780    final InternalScanner s, final List<Result> result, final int limit, final boolean hasNext)
1781    throws IOException {
1782    requireScannerOwner(s);
1783    return hasNext;
1784  }
1785
1786  @Override
1787  public void preScannerClose(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
1788    final InternalScanner s) throws IOException {
1789    requireScannerOwner(s);
1790  }
1791
1792  @Override
1793  public void postScannerClose(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
1794    final InternalScanner s) throws IOException {
1795    // clean up any associated owner mapping
1796    scannerOwners.remove(s);
1797  }
1798
1799  /**
1800   * Verify, when servicing an RPC, that the caller is the scanner owner. If so, we assume that
1801   * access control is correctly enforced based on the checks performed in preScannerOpen()
1802   */
1803  private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
1804    if (!RpcServer.isInRpcCallContext()) {
1805      return;
1806    }
1807    String requestUserName = RpcServer.getRequestUserName().orElse(null);
1808    String owner = scannerOwners.get(s);
1809    if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
1810      throw new AccessDeniedException("User '" + requestUserName + "' is not the scanner owner!");
1811    }
1812  }
1813
1814  /**
1815   * Verifies user has CREATE or ADMIN privileges on the Column Families involved in the
1816   * bulkLoadHFile request. Specific Column Write privileges are presently ignored.
1817   */
1818  @Override
1819  public void preBulkLoadHFile(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
1820    List<Pair<byte[], String>> familyPaths) throws IOException {
1821    User user = getActiveUser(ctx);
1822    for (Pair<byte[], String> el : familyPaths) {
1823      accessChecker.requirePermission(user, "preBulkLoadHFile",
1824        ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), el.getFirst(), null,
1825        null, Action.ADMIN, Action.CREATE);
1826    }
1827  }
1828
1829  /**
1830   * Authorization check for SecureBulkLoadProtocol.prepareBulkLoad()
1831   * @param ctx the context
1832   */
1833  @Override
1834  public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
1835    throws IOException {
1836    requireAccess(ctx, "prePrepareBulkLoad",
1837      ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.ADMIN,
1838      Action.CREATE);
1839  }
1840
1841  /**
1842   * Authorization security check for SecureBulkLoadProtocol.cleanupBulkLoad()
1843   * @param ctx the context
1844   */
1845  @Override
1846  public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
1847    throws IOException {
1848    requireAccess(ctx, "preCleanupBulkLoad",
1849      ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.ADMIN,
1850      Action.CREATE);
1851  }
1852
1853  /* ---- EndpointObserver implementation ---- */
1854
1855  @Override
1856  public Message preEndpointInvocation(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
1857    Service service, String methodName, Message request) throws IOException {
1858    // Don't intercept calls to our own AccessControlService, we check for
1859    // appropriate permissions in the service handlers
1860    if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
1861      requirePermission(ctx,
1862        "invoke(" + service.getDescriptorForType().getName() + "." + methodName + ")",
1863        getTableName(ctx.getEnvironment()), null, null, Action.EXEC);
1864    }
1865    return request;
1866  }
1867
1868  @Override
1869  public void postEndpointInvocation(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
1870    Service service, String methodName, Message request, Message.Builder responseBuilder)
1871    throws IOException {
1872  }
1873
1874  /* ---- Protobuf AccessControlService implementation ---- */
1875
1876  /**
1877   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1878   *             {@link Admin#grant(UserPermission, boolean)} instead.
1879   * @see Admin#grant(UserPermission, boolean)
1880   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21739">HBASE-21739</a>
1881   */
1882  @Deprecated
1883  @Override
1884  public void grant(RpcController controller, AccessControlProtos.GrantRequest request,
1885    RpcCallback<AccessControlProtos.GrantResponse> done) {
1886    final UserPermission perm = AccessControlUtil.toUserPermission(request.getUserPermission());
1887    AccessControlProtos.GrantResponse response = null;
1888    try {
1889      // verify it's only running at .acl.
1890      if (aclRegion) {
1891        if (!initialized) {
1892          throw new CoprocessorException("AccessController not yet initialized");
1893        }
1894        User caller = RpcServer.getRequestUser().orElse(null);
1895        if (LOG.isDebugEnabled()) {
1896          LOG.debug("Received request from {} to grant access permission {}", caller.getName(),
1897            perm.toString());
1898        }
1899        preGrantOrRevoke(caller, "grant", perm);
1900
1901        // regionEnv is set at #start. Hopefully not null at this point.
1902        regionEnv.getConnection().getAdmin().grant(
1903          new UserPermission(perm.getUser(), perm.getPermission()),
1904          request.getMergeExistingPermissions());
1905        if (AUDITLOG.isTraceEnabled()) {
1906          // audit log should store permission changes in addition to auth results
1907          AUDITLOG.trace("Granted permission " + perm.toString());
1908        }
1909      } else {
1910        throw new CoprocessorException(AccessController.class,
1911          "This method " + "can only execute at " + PermissionStorage.ACL_TABLE_NAME + " table.");
1912      }
1913      response = AccessControlProtos.GrantResponse.getDefaultInstance();
1914    } catch (IOException ioe) {
1915      // pass exception back up
1916      CoprocessorRpcUtils.setControllerException(controller, ioe);
1917    }
1918    done.run(response);
1919  }
1920
1921  /**
1922   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use {@link Admin#revoke(UserPermission)}
1923   *             instead.
1924   * @see Admin#revoke(UserPermission)
1925   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21739">HBASE-21739</a>
1926   */
1927  @Deprecated
1928  @Override
1929  public void revoke(RpcController controller, AccessControlProtos.RevokeRequest request,
1930    RpcCallback<AccessControlProtos.RevokeResponse> done) {
1931    final UserPermission perm = AccessControlUtil.toUserPermission(request.getUserPermission());
1932    AccessControlProtos.RevokeResponse response = null;
1933    try {
1934      // only allowed to be called on _acl_ region
1935      if (aclRegion) {
1936        if (!initialized) {
1937          throw new CoprocessorException("AccessController not yet initialized");
1938        }
1939        User caller = RpcServer.getRequestUser().orElse(null);
1940        if (LOG.isDebugEnabled()) {
1941          LOG.debug("Received request from {} to revoke access permission {}",
1942            caller.getShortName(), perm.toString());
1943        }
1944        preGrantOrRevoke(caller, "revoke", perm);
1945        // regionEnv is set at #start. Hopefully not null here.
1946        regionEnv.getConnection().getAdmin()
1947          .revoke(new UserPermission(perm.getUser(), perm.getPermission()));
1948        if (AUDITLOG.isTraceEnabled()) {
1949          // audit log should record all permission changes
1950          AUDITLOG.trace("Revoked permission " + perm.toString());
1951        }
1952      } else {
1953        throw new CoprocessorException(AccessController.class,
1954          "This method " + "can only execute at " + PermissionStorage.ACL_TABLE_NAME + " table.");
1955      }
1956      response = AccessControlProtos.RevokeResponse.getDefaultInstance();
1957    } catch (IOException ioe) {
1958      // pass exception back up
1959      CoprocessorRpcUtils.setControllerException(controller, ioe);
1960    }
1961    done.run(response);
1962  }
1963
1964  /**
1965   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1966   *             {@link Admin#getUserPermissions(GetUserPermissionsRequest)} instead.
1967   * @see Admin#getUserPermissions(GetUserPermissionsRequest)
1968   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21911">HBASE-21911</a>
1969   */
1970  @Deprecated
1971  @Override
1972  public void getUserPermissions(RpcController controller,
1973    AccessControlProtos.GetUserPermissionsRequest request,
1974    RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
1975    AccessControlProtos.GetUserPermissionsResponse response = null;
1976    try {
1977      // only allowed to be called on _acl_ region
1978      if (aclRegion) {
1979        if (!initialized) {
1980          throw new CoprocessorException("AccessController not yet initialized");
1981        }
1982        User caller = RpcServer.getRequestUser().orElse(null);
1983        final String userName = request.hasUserName() ? request.getUserName().toStringUtf8() : null;
1984        final String namespace =
1985          request.hasNamespaceName() ? request.getNamespaceName().toStringUtf8() : null;
1986        final TableName table =
1987          request.hasTableName() ? ProtobufUtil.toTableName(request.getTableName()) : null;
1988        final byte[] cf =
1989          request.hasColumnFamily() ? request.getColumnFamily().toByteArray() : null;
1990        final byte[] cq =
1991          request.hasColumnQualifier() ? request.getColumnQualifier().toByteArray() : null;
1992        preGetUserPermissions(caller, userName, namespace, table, cf, cq);
1993        GetUserPermissionsRequest getUserPermissionsRequest = null;
1994        if (request.getType() == AccessControlProtos.Permission.Type.Table) {
1995          getUserPermissionsRequest = GetUserPermissionsRequest.newBuilder(table).withFamily(cf)
1996            .withQualifier(cq).withUserName(userName).build();
1997        } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
1998          getUserPermissionsRequest =
1999            GetUserPermissionsRequest.newBuilder(namespace).withUserName(userName).build();
2000        } else {
2001          getUserPermissionsRequest =
2002            GetUserPermissionsRequest.newBuilder().withUserName(userName).build();
2003        }
2004        List<UserPermission> perms =
2005          regionEnv.getConnection().getAdmin().getUserPermissions(getUserPermissionsRequest);
2006        response = AccessControlUtil.buildGetUserPermissionsResponse(perms);
2007      } else {
2008        throw new CoprocessorException(AccessController.class,
2009          "This method " + "can only execute at " + PermissionStorage.ACL_TABLE_NAME + " table.");
2010      }
2011    } catch (IOException ioe) {
2012      // pass exception back up
2013      CoprocessorRpcUtils.setControllerException(controller, ioe);
2014    }
2015    done.run(response);
2016  }
2017
2018  /**
2019   * @deprecated since 2.2.0 and will be removed 4.0.0. Use {@link Admin#hasUserPermissions(List)}
2020   *             instead.
2021   * @see Admin#hasUserPermissions(List)
2022   * @see <a href="https://issues.apache.org/jira/browse/HBASE-22117">HBASE-22117</a>
2023   */
2024  @Deprecated
2025  @Override
2026  public void checkPermissions(RpcController controller,
2027    AccessControlProtos.CheckPermissionsRequest request,
2028    RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2029    AccessControlProtos.CheckPermissionsResponse response = null;
2030    try {
2031      User user = RpcServer.getRequestUser().orElse(null);
2032      TableName tableName = regionEnv.getRegion().getTableDescriptor().getTableName();
2033      List<Permission> permissions = new ArrayList<>();
2034      for (int i = 0; i < request.getPermissionCount(); i++) {
2035        Permission permission = AccessControlUtil.toPermission(request.getPermission(i));
2036        permissions.add(permission);
2037        if (permission instanceof TablePermission) {
2038          TablePermission tperm = (TablePermission) permission;
2039          if (!tperm.getTableName().equals(tableName)) {
2040            throw new CoprocessorException(AccessController.class,
2041              String.format(
2042                "This method can only execute at the table specified in "
2043                  + "TablePermission. Table of the region:%s , requested table:%s",
2044                tableName, tperm.getTableName()));
2045          }
2046        }
2047      }
2048      for (Permission permission : permissions) {
2049        boolean hasPermission =
2050          accessChecker.hasUserPermission(user, "checkPermissions", permission);
2051        if (!hasPermission) {
2052          throw new AccessDeniedException("Insufficient permissions " + permission.toString());
2053        }
2054      }
2055      response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2056    } catch (IOException ioe) {
2057      CoprocessorRpcUtils.setControllerException(controller, ioe);
2058    }
2059    done.run(response);
2060  }
2061
2062  private Region getRegion(RegionCoprocessorEnvironment e) {
2063    return e.getRegion();
2064  }
2065
2066  private TableName getTableName(RegionCoprocessorEnvironment e) {
2067    Region region = e.getRegion();
2068    if (region != null) {
2069      return getTableName(region);
2070    }
2071    return null;
2072  }
2073
2074  private TableName getTableName(Region region) {
2075    RegionInfo regionInfo = region.getRegionInfo();
2076    if (regionInfo != null) {
2077      return regionInfo.getTable();
2078    }
2079    return null;
2080  }
2081
2082  @Override
2083  public void preClose(ObserverContext<? extends RegionCoprocessorEnvironment> c,
2084    boolean abortRequested) throws IOException {
2085    requirePermission(c, "preClose", Action.ADMIN);
2086  }
2087
2088  private void checkSystemOrSuperUser(User activeUser) throws IOException {
2089    // No need to check if we're not going to throw
2090    if (!authorizationEnabled) {
2091      return;
2092    }
2093    if (!Superusers.isSuperUser(activeUser)) {
2094      throw new AccessDeniedException(
2095        "User '" + (activeUser != null ? activeUser.getShortName() : "null")
2096          + "' is not system or super user.");
2097    }
2098  }
2099
2100  @Override
2101  public void preStopRegionServer(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2102    throws IOException {
2103    requirePermission(ctx, "preStopRegionServer", Action.ADMIN);
2104  }
2105
2106  private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family, byte[] qualifier) {
2107    if (family == null) {
2108      return null;
2109    }
2110
2111    Map<byte[], Collection<byte[]>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2112    familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2113    return familyMap;
2114  }
2115
2116  @Override
2117  public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2118    List<TableName> tableNamesList, List<TableDescriptor> descriptors, String regex)
2119    throws IOException {
2120    // We are delegating the authorization check to postGetTableDescriptors as we don't have
2121    // any concrete set of table names when a regex is present or the full list is requested.
2122    if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2123      // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2124      // request can be granted.
2125      try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) {
2126        for (TableName tableName : tableNamesList) {
2127          // Skip checks for a table that does not exist
2128          if (!admin.tableExists(tableName)) {
2129            continue;
2130          }
2131          requirePermission(ctx, "getTableDescriptors", tableName, null, null, Action.ADMIN,
2132            Action.CREATE);
2133        }
2134      }
2135    }
2136  }
2137
2138  @Override
2139  public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2140    List<TableName> tableNamesList, List<TableDescriptor> descriptors, String regex)
2141    throws IOException {
2142    // Skipping as checks in this case are already done by preGetTableDescriptors.
2143    if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2144      return;
2145    }
2146
2147    // Retains only those which passes authorization checks, as the checks weren't done as part
2148    // of preGetTableDescriptors.
2149    Iterator<TableDescriptor> itr = descriptors.iterator();
2150    while (itr.hasNext()) {
2151      TableDescriptor htd = itr.next();
2152      try {
2153        requirePermission(ctx, "getTableDescriptors", htd.getTableName(), null, null, Action.ADMIN,
2154          Action.CREATE);
2155      } catch (AccessDeniedException e) {
2156        itr.remove();
2157      }
2158    }
2159  }
2160
2161  @Override
2162  public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2163    List<TableDescriptor> descriptors, String regex) throws IOException {
2164    // Retains only those which passes authorization checks.
2165    Iterator<TableDescriptor> itr = descriptors.iterator();
2166    while (itr.hasNext()) {
2167      TableDescriptor htd = itr.next();
2168      try {
2169        requireAccess(ctx, "getTableNames", htd.getTableName(), Action.values());
2170      } catch (AccessDeniedException e) {
2171        itr.remove();
2172      }
2173    }
2174  }
2175
2176  @Override
2177  public void preMergeRegions(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2178    final RegionInfo[] regionsToMerge) throws IOException {
2179    requirePermission(ctx, "mergeRegions", regionsToMerge[0].getTable(), null, null, Action.ADMIN);
2180  }
2181
2182  @Override
2183  public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2184    throws IOException {
2185    requirePermission(ctx, "preRollLogWriterRequest", Permission.Action.ADMIN);
2186  }
2187
2188  @Override
2189  public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2190    throws IOException {
2191  }
2192
2193  @Override
2194  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2195    final String userName, final GlobalQuotaSettings quotas) throws IOException {
2196    requirePermission(ctx, "setUserQuota", Action.ADMIN);
2197  }
2198
2199  @Override
2200  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2201    final String userName, final TableName tableName, final GlobalQuotaSettings quotas)
2202    throws IOException {
2203    requirePermission(ctx, "setUserTableQuota", tableName, null, null, Action.ADMIN);
2204  }
2205
2206  @Override
2207  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2208    final String userName, final String namespace, final GlobalQuotaSettings quotas)
2209    throws IOException {
2210    requirePermission(ctx, "setUserNamespaceQuota", Action.ADMIN);
2211  }
2212
2213  @Override
2214  public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2215    final TableName tableName, final GlobalQuotaSettings quotas) throws IOException {
2216    requirePermission(ctx, "setTableQuota", tableName, null, null, Action.ADMIN);
2217  }
2218
2219  @Override
2220  public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2221    final String namespace, final GlobalQuotaSettings quotas) throws IOException {
2222    requirePermission(ctx, "setNamespaceQuota", Action.ADMIN);
2223  }
2224
2225  @Override
2226  public void preSetRegionServerQuota(ObserverContext<MasterCoprocessorEnvironment> ctx,
2227    final String regionServer, GlobalQuotaSettings quotas) throws IOException {
2228    requirePermission(ctx, "setRegionServerQuota", Action.ADMIN);
2229  }
2230
2231  @Override
2232  public ReplicationEndpoint postCreateReplicationEndPoint(
2233    ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2234    return endpoint;
2235  }
2236
2237  @Override
2238  public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2239    throws IOException {
2240    requirePermission(ctx, "replicateLogEntries", Action.WRITE);
2241  }
2242
2243  @Override
2244  public void preClearCompactionQueues(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2245    throws IOException {
2246    requirePermission(ctx, "preClearCompactionQueues", Permission.Action.ADMIN);
2247  }
2248
2249  @Override
2250  public void preAddReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2251    String peerId, ReplicationPeerConfig peerConfig) throws IOException {
2252    requirePermission(ctx, "addReplicationPeer", Action.ADMIN);
2253  }
2254
2255  @Override
2256  public void preRemoveReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2257    String peerId) throws IOException {
2258    requirePermission(ctx, "removeReplicationPeer", Action.ADMIN);
2259  }
2260
2261  @Override
2262  public void preEnableReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2263    String peerId) throws IOException {
2264    requirePermission(ctx, "enableReplicationPeer", Action.ADMIN);
2265  }
2266
2267  @Override
2268  public void preDisableReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2269    String peerId) throws IOException {
2270    requirePermission(ctx, "disableReplicationPeer", Action.ADMIN);
2271  }
2272
2273  @Override
2274  public void preGetReplicationPeerConfig(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2275    String peerId) throws IOException {
2276    requirePermission(ctx, "getReplicationPeerConfig", Action.ADMIN);
2277  }
2278
2279  @Override
2280  public void preUpdateReplicationPeerConfig(
2281    final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
2282    ReplicationPeerConfig peerConfig) throws IOException {
2283    requirePermission(ctx, "updateReplicationPeerConfig", Action.ADMIN);
2284  }
2285
2286  @Override
2287  public void preTransitReplicationPeerSyncReplicationState(
2288    final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
2289    SyncReplicationState clusterState) throws IOException {
2290    requirePermission(ctx, "transitSyncReplicationPeerState", Action.ADMIN);
2291  }
2292
2293  @Override
2294  public void preListReplicationPeers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2295    String regex) throws IOException {
2296    requirePermission(ctx, "listReplicationPeers", Action.ADMIN);
2297  }
2298
2299  @Override
2300  public void preRequestLock(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace,
2301    TableName tableName, RegionInfo[] regionInfos, String description) throws IOException {
2302    // There are operations in the CREATE and ADMIN domain which may require lock, READ
2303    // or WRITE. So for any lock request, we check for these two perms irrespective of lock type.
2304    String reason = String.format("Description=%s", description);
2305    checkLockPermissions(ctx, namespace, tableName, regionInfos, reason);
2306  }
2307
2308  @Override
2309  public void preLockHeartbeat(ObserverContext<MasterCoprocessorEnvironment> ctx,
2310    TableName tableName, String description) throws IOException {
2311    checkLockPermissions(ctx, null, tableName, null, description);
2312  }
2313
2314  @Override
2315  public void preExecuteProcedures(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2316    throws IOException {
2317    checkSystemOrSuperUser(getActiveUser(ctx));
2318  }
2319
2320  @Override
2321  public void preSwitchRpcThrottle(ObserverContext<MasterCoprocessorEnvironment> ctx,
2322    boolean enable) throws IOException {
2323    requirePermission(ctx, "switchRpcThrottle", Action.ADMIN);
2324  }
2325
2326  @Override
2327  public void preIsRpcThrottleEnabled(ObserverContext<MasterCoprocessorEnvironment> ctx)
2328    throws IOException {
2329    requirePermission(ctx, "isRpcThrottleEnabled", Action.ADMIN);
2330  }
2331
2332  @Override
2333  public void preSwitchExceedThrottleQuota(ObserverContext<MasterCoprocessorEnvironment> ctx,
2334    boolean enable) throws IOException {
2335    requirePermission(ctx, "switchExceedThrottleQuota", Action.ADMIN);
2336  }
2337
2338  /**
2339   * Returns the active user to which authorization checks should be applied. If we are in the
2340   * context of an RPC call, the remote user is used, otherwise the currently logged in user is
2341   * used.
2342   */
2343  private User getActiveUser(ObserverContext<?> ctx) throws IOException {
2344    // for non-rpc handling, fallback to system user
2345    Optional<User> optionalUser = ctx.getCaller();
2346    if (optionalUser.isPresent()) {
2347      return optionalUser.get();
2348    }
2349    return userProvider.getCurrent();
2350  }
2351
2352  /**
2353   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
2354   *             {@link Admin#hasUserPermissions(String, List)} instead.
2355   * @see Admin#hasUserPermissions(String, List)
2356   * @see <a href="https://issues.apache.org/jira/browse/HBASE-22117">HBASE-22117</a>
2357   */
2358  @Deprecated
2359  @Override
2360  public void hasPermission(RpcController controller, HasPermissionRequest request,
2361    RpcCallback<HasPermissionResponse> done) {
2362    // Converts proto to a TablePermission object.
2363    TablePermission tPerm = AccessControlUtil.toTablePermission(request.getTablePermission());
2364    // Check input user name
2365    if (!request.hasUserName()) {
2366      throw new IllegalStateException("Input username cannot be empty");
2367    }
2368    final String inputUserName = request.getUserName().toStringUtf8();
2369    AccessControlProtos.HasPermissionResponse response = null;
2370    try {
2371      User caller = RpcServer.getRequestUser().orElse(null);
2372      List<Permission> permissions = Lists.newArrayList(tPerm);
2373      preHasUserPermissions(caller, inputUserName, permissions);
2374      boolean hasPermission =
2375        regionEnv.getConnection().getAdmin().hasUserPermissions(inputUserName, permissions).get(0);
2376      response = ResponseConverter.buildHasPermissionResponse(hasPermission);
2377    } catch (IOException ioe) {
2378      ResponseConverter.setControllerException(controller, ioe);
2379    }
2380    done.run(response);
2381  }
2382
2383  @Override
2384  public void preGrant(ObserverContext<MasterCoprocessorEnvironment> ctx,
2385    UserPermission userPermission, boolean mergeExistingPermissions) throws IOException {
2386    preGrantOrRevoke(getActiveUser(ctx), "grant", userPermission);
2387  }
2388
2389  @Override
2390  public void preRevoke(ObserverContext<MasterCoprocessorEnvironment> ctx,
2391    UserPermission userPermission) throws IOException {
2392    preGrantOrRevoke(getActiveUser(ctx), "revoke", userPermission);
2393  }
2394
2395  private void preGrantOrRevoke(User caller, String request, UserPermission userPermission)
2396    throws IOException {
2397    switch (userPermission.getPermission().scope) {
2398      case GLOBAL:
2399        accessChecker.requireGlobalPermission(caller, request, Action.ADMIN, "");
2400        break;
2401      case NAMESPACE:
2402        NamespacePermission namespacePerm = (NamespacePermission) userPermission.getPermission();
2403        accessChecker.requireNamespacePermission(caller, request, namespacePerm.getNamespace(),
2404          null, Action.ADMIN);
2405        break;
2406      case TABLE:
2407        TablePermission tablePerm = (TablePermission) userPermission.getPermission();
2408        accessChecker.requirePermission(caller, request, tablePerm.getTableName(),
2409          tablePerm.getFamily(), tablePerm.getQualifier(), null, Action.ADMIN);
2410        break;
2411      default:
2412    }
2413    if (!Superusers.isSuperUser(caller)) {
2414      accessChecker.performOnSuperuser(request, caller, userPermission.getUser());
2415    }
2416  }
2417
2418  @Override
2419  public void preGetUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
2420    String userName, String namespace, TableName tableName, byte[] family, byte[] qualifier)
2421    throws IOException {
2422    preGetUserPermissions(getActiveUser(ctx), userName, namespace, tableName, family, qualifier);
2423  }
2424
2425  private void preGetUserPermissions(User caller, String userName, String namespace,
2426    TableName tableName, byte[] family, byte[] qualifier) throws IOException {
2427    if (tableName != null) {
2428      accessChecker.requirePermission(caller, "getUserPermissions", tableName, family, qualifier,
2429        userName, Action.ADMIN);
2430    } else if (namespace != null) {
2431      accessChecker.requireNamespacePermission(caller, "getUserPermissions", namespace, userName,
2432        Action.ADMIN);
2433    } else {
2434      accessChecker.requirePermission(caller, "getUserPermissions", userName, Action.ADMIN);
2435    }
2436  }
2437
2438  @Override
2439  public void preHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
2440    String userName, List<Permission> permissions) throws IOException {
2441    preHasUserPermissions(getActiveUser(ctx), userName, permissions);
2442  }
2443
2444  private void preHasUserPermissions(User caller, String userName, List<Permission> permissions)
2445    throws IOException {
2446    String request = "hasUserPermissions";
2447    for (Permission permission : permissions) {
2448      if (!caller.getShortName().equals(userName)) {
2449        // User should have admin privilege if checking permission for other users
2450        if (permission instanceof TablePermission) {
2451          TablePermission tPerm = (TablePermission) permission;
2452          accessChecker.requirePermission(caller, request, tPerm.getTableName(), tPerm.getFamily(),
2453            tPerm.getQualifier(), userName, Action.ADMIN);
2454        } else if (permission instanceof NamespacePermission) {
2455          NamespacePermission nsPerm = (NamespacePermission) permission;
2456          accessChecker.requireNamespacePermission(caller, request, nsPerm.getNamespace(), userName,
2457            Action.ADMIN);
2458        } else {
2459          accessChecker.requirePermission(caller, request, userName, Action.ADMIN);
2460        }
2461      } else {
2462        // User don't need ADMIN privilege for self check.
2463        // Setting action as null in AuthResult to display empty action in audit log
2464        AuthResult result;
2465        if (permission instanceof TablePermission) {
2466          TablePermission tPerm = (TablePermission) permission;
2467          result = AuthResult.allow(request, "Self user validation allowed", caller, null,
2468            tPerm.getTableName(), tPerm.getFamily(), tPerm.getQualifier());
2469        } else if (permission instanceof NamespacePermission) {
2470          NamespacePermission nsPerm = (NamespacePermission) permission;
2471          result = AuthResult.allow(request, "Self user validation allowed", caller, null,
2472            nsPerm.getNamespace());
2473        } else {
2474          result = AuthResult.allow(request, "Self user validation allowed", caller, null, null,
2475            null, null);
2476        }
2477        AccessChecker.logResult(result);
2478      }
2479    }
2480  }
2481
2482  @Override
2483  public void preMoveServersAndTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
2484    Set<Address> servers, Set<TableName> tables, String targetGroup) throws IOException {
2485    accessChecker.requirePermission(getActiveUser(ctx), "moveServersAndTables", null,
2486      Permission.Action.ADMIN);
2487  }
2488
2489  @Override
2490  public void preMoveServers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2491    Set<Address> servers, String targetGroup) throws IOException {
2492    accessChecker.requirePermission(getActiveUser(ctx), "moveServers", null,
2493      Permission.Action.ADMIN);
2494  }
2495
2496  @Override
2497  public void preMoveTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
2498    Set<TableName> tables, String targetGroup) throws IOException {
2499    accessChecker.requirePermission(getActiveUser(ctx), "moveTables", null,
2500      Permission.Action.ADMIN);
2501  }
2502
2503  @Override
2504  public void preAddRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx, String name)
2505    throws IOException {
2506    accessChecker.requirePermission(getActiveUser(ctx), "addRSGroup", null,
2507      Permission.Action.ADMIN);
2508  }
2509
2510  @Override
2511  public void preRemoveRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx, String name)
2512    throws IOException {
2513    accessChecker.requirePermission(getActiveUser(ctx), "removeRSGroup", null,
2514      Permission.Action.ADMIN);
2515  }
2516
2517  @Override
2518  public void preBalanceRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx, String groupName,
2519    BalanceRequest request) throws IOException {
2520    accessChecker.requirePermission(getActiveUser(ctx), "balanceRSGroup", null,
2521      Permission.Action.ADMIN);
2522  }
2523
2524  @Override
2525  public void preRemoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
2526    Set<Address> servers) throws IOException {
2527    accessChecker.requirePermission(getActiveUser(ctx), "removeServers", null,
2528      Permission.Action.ADMIN);
2529  }
2530
2531  @Override
2532  public void preGetRSGroupInfo(ObserverContext<MasterCoprocessorEnvironment> ctx, String groupName)
2533    throws IOException {
2534    accessChecker.requirePermission(getActiveUser(ctx), "getRSGroupInfo", null,
2535      Permission.Action.ADMIN);
2536  }
2537
2538  @Override
2539  public void preGetRSGroupInfoOfTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
2540    TableName tableName) throws IOException {
2541    accessChecker.requirePermission(getActiveUser(ctx), "getRSGroupInfoOfTable", null,
2542      Permission.Action.ADMIN);
2543    // todo: should add check for table existence
2544  }
2545
2546  @Override
2547  public void preListRSGroups(ObserverContext<MasterCoprocessorEnvironment> ctx)
2548    throws IOException {
2549    accessChecker.requirePermission(getActiveUser(ctx), "listRSGroups", null,
2550      Permission.Action.ADMIN);
2551  }
2552
2553  @Override
2554  public void preListTablesInRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2555    String groupName) throws IOException {
2556    accessChecker.requirePermission(getActiveUser(ctx), "listTablesInRSGroup", null,
2557      Permission.Action.ADMIN);
2558  }
2559
2560  @Override
2561  public void preGetConfiguredNamespacesAndTablesInRSGroup(
2562    ObserverContext<MasterCoprocessorEnvironment> ctx, String groupName) throws IOException {
2563    accessChecker.requirePermission(getActiveUser(ctx), "getConfiguredNamespacesAndTablesInRSGroup",
2564      null, Permission.Action.ADMIN);
2565  }
2566
2567  @Override
2568  public void preGetRSGroupInfoOfServer(ObserverContext<MasterCoprocessorEnvironment> ctx,
2569    Address server) throws IOException {
2570    accessChecker.requirePermission(getActiveUser(ctx), "getRSGroupInfoOfServer", null,
2571      Permission.Action.ADMIN);
2572  }
2573
2574  @Override
2575  public void preRenameRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx, String oldName,
2576    String newName) throws IOException {
2577    accessChecker.requirePermission(getActiveUser(ctx), "renameRSGroup", null,
2578      Permission.Action.ADMIN);
2579  }
2580
2581  @Override
2582  public void preUpdateRSGroupConfig(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2583    final String groupName, final Map<String, String> configuration) throws IOException {
2584    accessChecker.requirePermission(getActiveUser(ctx), "updateRSGroupConfig", null,
2585      Permission.Action.ADMIN);
2586  }
2587
2588  @Override
2589  public void preClearRegionBlockCache(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2590    throws IOException {
2591    accessChecker.requirePermission(getActiveUser(ctx), "clearRegionBlockCache", null,
2592      Permission.Action.ADMIN);
2593  }
2594
2595  @Override
2596  public void preUpdateRegionServerConfiguration(
2597    ObserverContext<RegionServerCoprocessorEnvironment> ctx, Configuration preReloadConf)
2598    throws IOException {
2599    accessChecker.requirePermission(getActiveUser(ctx), "updateConfiguration", null,
2600      Permission.Action.ADMIN);
2601  }
2602
2603  @Override
2604  public void preUpdateMasterConfiguration(ObserverContext<MasterCoprocessorEnvironment> ctx,
2605    Configuration preReloadConf) throws IOException {
2606    accessChecker.requirePermission(getActiveUser(ctx), "updateConfiguration", null,
2607      Permission.Action.ADMIN);
2608  }
2609
2610}