001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.access;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Optional;
026import java.util.Set;
027import java.util.stream.Collectors;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HBaseInterfaceAudience;
033import org.apache.hadoop.hbase.NamespaceDescriptor;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.TableNotFoundException;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.Delete;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.RegionInfo;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.ResultScanner;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.client.SnapshotDescription;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
051import org.apache.hadoop.hbase.coprocessor.HasMasterServices;
052import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
053import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
054import org.apache.hadoop.hbase.coprocessor.MasterObserver;
055import org.apache.hadoop.hbase.coprocessor.ObserverContext;
056import org.apache.hadoop.hbase.master.MasterServices;
057import org.apache.hadoop.hbase.security.User;
058import org.apache.hadoop.hbase.security.UserProvider;
059import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper.PathHelper;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.Pair;
062import org.apache.yetus.audience.InterfaceAudience;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
067
068/**
069 * Set HDFS ACLs to hFiles to make HBase granted users have permission to scan snapshot
070 * <p>
071 * To use this feature, please mask sure HDFS config:
072 * <ul>
073 * <li>dfs.namenode.acls.enabled = true</li>
074 * <li>fs.permissions.umask-mode = 027 (or smaller umask than 027)</li>
075 * </ul>
076 * </p>
077 * <p>
078 * The implementation of this feature is as followings:
079 * <ul>
080 * <li>For common directories such as 'data' and 'archive', set other permission to '--x' to make
081 * everyone have the permission to access the directory.</li>
082 * <li>For namespace or table directories such as 'data/ns/table', 'archive/ns/table' and
083 * '.hbase-snapshot/snapshotName', set user 'r-x' access acl and 'r-x' default acl when following
084 * operations happen:
085 * <ul>
086 * <li>grant user with global, namespace or table permission;</li>
087 * <li>revoke user from global, namespace or table;</li>
088 * <li>snapshot table;</li>
089 * <li>truncate table;</li>
090 * </ul>
091 * </li>
092 * <li>Note: Because snapshots are at table level, so this feature just considers users with global,
093 * namespace or table permissions, ignores users with table CF or cell permissions.</li>
094 * </ul>
095 * </p>
096 */
097@CoreCoprocessor
098@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
099public class SnapshotScannerHDFSAclController implements MasterCoprocessor, MasterObserver {
100  private static final Logger LOG = LoggerFactory.getLogger(SnapshotScannerHDFSAclController.class);
101
102  private SnapshotScannerHDFSAclHelper hdfsAclHelper = null;
103  private PathHelper pathHelper = null;
104  private MasterServices masterServices = null;
105  private volatile boolean initialized = false;
106  private volatile boolean aclTableInitialized = false;
107  /** Provider for mapping principal names to Users */
108  private UserProvider userProvider;
109
110  @Override
111  public Optional<MasterObserver> getMasterObserver() {
112    return Optional.of(this);
113  }
114
115  @Override
116  public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> c)
117    throws IOException {
118    if (
119      c.getEnvironment().getConfiguration()
120        .getBoolean(SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE, false)
121    ) {
122      MasterCoprocessorEnvironment mEnv = c.getEnvironment();
123      if (!(mEnv instanceof HasMasterServices)) {
124        throw new IOException("Does not implement HMasterServices");
125      }
126      masterServices = ((HasMasterServices) mEnv).getMasterServices();
127      hdfsAclHelper = new SnapshotScannerHDFSAclHelper(masterServices.getConfiguration(),
128        masterServices.getConnection());
129      pathHelper = hdfsAclHelper.getPathHelper();
130      hdfsAclHelper.setCommonDirectoryPermission();
131      initialized = true;
132      userProvider = UserProvider.instantiate(c.getEnvironment().getConfiguration());
133    } else {
134      LOG.warn("Try to initialize the coprocessor SnapshotScannerHDFSAclController but failure "
135        + "because the config " + SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE
136        + " is false.");
137    }
138  }
139
140  @Override
141  public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> c) throws IOException {
142    if (!initialized) {
143      return;
144    }
145    try (Admin admin = c.getEnvironment().getConnection().getAdmin()) {
146      if (admin.tableExists(PermissionStorage.ACL_TABLE_NAME)) {
147        // Check if acl table has 'm' CF, if not, add 'm' CF
148        TableDescriptor tableDescriptor = admin.getDescriptor(PermissionStorage.ACL_TABLE_NAME);
149        boolean containHdfsAclFamily = Arrays.stream(tableDescriptor.getColumnFamilies()).anyMatch(
150          family -> Bytes.equals(family.getName(), SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY));
151        if (!containHdfsAclFamily) {
152          TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableDescriptor)
153            .setColumnFamily(ColumnFamilyDescriptorBuilder
154              .newBuilder(SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY).build());
155          admin.modifyTable(builder.build());
156        }
157        aclTableInitialized = true;
158      } else {
159        throw new TableNotFoundException(
160          "Table " + PermissionStorage.ACL_TABLE_NAME + " is not created yet. Please check if "
161            + getClass().getName() + " is configured after " + AccessController.class.getName());
162      }
163    }
164  }
165
166  @Override
167  public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c) {
168    if (initialized) {
169      hdfsAclHelper.close();
170    }
171  }
172
173  @Override
174  public void postCompletedCreateTableAction(ObserverContext<MasterCoprocessorEnvironment> c,
175    TableDescriptor desc, RegionInfo[] regions) throws IOException {
176    if (needHandleTableHdfsAcl(desc, "createTable " + desc.getTableName())) {
177      TableName tableName = desc.getTableName();
178      // 1. Create table directories to make HDFS acls can be inherited
179      hdfsAclHelper.createTableDirectories(tableName);
180      // 2. Add table owner HDFS acls
181      String owner =
182        desc.getOwnerString() == null ? getActiveUser(c).getShortName() : desc.getOwnerString();
183      hdfsAclHelper.addTableAcl(tableName, Sets.newHashSet(owner), "create");
184      // 3. Record table owner permission is synced to HDFS in acl table
185      SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(c.getEnvironment().getConnection(), owner,
186        tableName);
187    }
188  }
189
190  @Override
191  public void postCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> c,
192    NamespaceDescriptor ns) throws IOException {
193    if (checkInitialized("createNamespace " + ns.getName())) {
194      // Create namespace directories to make HDFS acls can be inherited
195      List<Path> paths = hdfsAclHelper.getNamespaceRootPaths(ns.getName());
196      for (Path path : paths) {
197        hdfsAclHelper.createDirIfNotExist(path);
198      }
199    }
200  }
201
202  @Override
203  public void postCompletedSnapshotAction(ObserverContext<MasterCoprocessorEnvironment> c,
204    SnapshotDescription snapshot, TableDescriptor tableDescriptor) throws IOException {
205    if (needHandleTableHdfsAcl(tableDescriptor, "snapshot " + snapshot.getName())) {
206      // Add HDFS acls of users with table read permission to snapshot files
207      hdfsAclHelper.snapshotAcl(snapshot);
208    }
209  }
210
211  @Override
212  public void postCompletedTruncateTableAction(ObserverContext<MasterCoprocessorEnvironment> c,
213    TableName tableName) throws IOException {
214    if (needHandleTableHdfsAcl(tableName, "truncateTable " + tableName)) {
215      // 1. create tmp table directories
216      hdfsAclHelper.createTableDirectories(tableName);
217      // 2. Since the table directories is recreated, so add HDFS acls again
218      Set<String> users = hdfsAclHelper.getUsersWithTableReadAction(tableName, false, false);
219      hdfsAclHelper.addTableAcl(tableName, users, "truncate");
220    }
221  }
222
223  @Override
224  public void postCompletedDeleteTableAction(ObserverContext<MasterCoprocessorEnvironment> ctx,
225    TableName tableName) throws IOException {
226    if (!tableName.isSystemTable() && checkInitialized("deleteTable " + tableName)) {
227      /*
228       * Remove table user access HDFS acl from namespace directory if the user has no permissions
229       * of global, ns of the table or other tables of the ns, eg: Bob has 'ns1:t1' read permission,
230       * when delete 'ns1:t1', if Bob has global read permission, '@ns1' read permission or
231       * 'ns1:other_tables' read permission, then skip remove Bob access acl in ns1Dirs, otherwise,
232       * remove Bob access acl.
233       */
234      try (Table aclTable =
235        ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
236        Set<String> users = SnapshotScannerHDFSAclStorage.getTableUsers(aclTable, tableName);
237        if (users.size() > 0) {
238          // 1. Remove table archive directory default ACLs
239          hdfsAclHelper.removeTableDefaultAcl(tableName, users);
240          // 2. Delete table owner permission is synced to HDFS in acl table
241          SnapshotScannerHDFSAclStorage.deleteTableHdfsAcl(aclTable, tableName);
242          // 3. Remove namespace access acls
243          Set<String> removeUsers = filterUsersToRemoveNsAccessAcl(aclTable, tableName, users);
244          if (removeUsers.size() > 0) {
245            hdfsAclHelper.removeNamespaceAccessAcl(tableName, removeUsers, "delete");
246          }
247        }
248      }
249    }
250  }
251
252  @Override
253  public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
254    TableName tableName, TableDescriptor oldDescriptor, TableDescriptor currentDescriptor)
255    throws IOException {
256    try (Table aclTable =
257      ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
258      if (
259        needHandleTableHdfsAcl(currentDescriptor, "modifyTable " + tableName)
260          && !hdfsAclHelper.isAclSyncToHdfsEnabled(oldDescriptor)
261      ) {
262        // 1. Create table directories used for acl inherited
263        hdfsAclHelper.createTableDirectories(tableName);
264        // 2. Add table users HDFS acls
265        Set<String> tableUsers = hdfsAclHelper.getUsersWithTableReadAction(tableName, false, false);
266        Set<String> users =
267          hdfsAclHelper.getUsersWithNamespaceReadAction(tableName.getNamespaceAsString(), true);
268        users.addAll(tableUsers);
269        hdfsAclHelper.addTableAcl(tableName, users, "modify");
270        // 3. Record table user acls are synced to HDFS in acl table
271        SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(ctx.getEnvironment().getConnection(),
272          tableUsers, tableName);
273      } else if (
274        needHandleTableHdfsAcl(oldDescriptor, "modifyTable " + tableName)
275          && !hdfsAclHelper.isAclSyncToHdfsEnabled(currentDescriptor)
276      ) {
277        // 1. Remove empty table directories
278        List<Path> tableRootPaths = hdfsAclHelper.getTableRootPaths(tableName, false);
279        for (Path path : tableRootPaths) {
280          hdfsAclHelper.deleteEmptyDir(path);
281        }
282        // 2. Remove all table HDFS acls
283        Set<String> tableUsers = hdfsAclHelper.getUsersWithTableReadAction(tableName, false, false);
284        Set<String> users =
285          hdfsAclHelper.getUsersWithNamespaceReadAction(tableName.getNamespaceAsString(), true);
286        users.addAll(tableUsers);
287        hdfsAclHelper.removeTableAcl(tableName, users);
288        // 3. Remove namespace access HDFS acls for users who only own permission for this table
289        hdfsAclHelper.removeNamespaceAccessAcl(tableName,
290          filterUsersToRemoveNsAccessAcl(aclTable, tableName, tableUsers), "modify");
291        // 4. Record table user acl is not synced to HDFS
292        SnapshotScannerHDFSAclStorage.deleteUserTableHdfsAcl(ctx.getEnvironment().getConnection(),
293          tableUsers, tableName);
294      }
295    }
296  }
297
298  @Override
299  public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
300    String namespace) throws IOException {
301    if (checkInitialized("deleteNamespace " + namespace)) {
302      try (Table aclTable =
303        ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
304        // 1. Delete namespace archive dir default ACLs
305        Set<String> users = SnapshotScannerHDFSAclStorage.getEntryUsers(aclTable,
306          PermissionStorage.toNamespaceEntry(Bytes.toBytes(namespace)));
307        hdfsAclHelper.removeNamespaceDefaultAcl(namespace, users);
308        // 2. Record namespace user acl is not synced to HDFS
309        SnapshotScannerHDFSAclStorage.deleteNamespaceHdfsAcl(ctx.getEnvironment().getConnection(),
310          namespace);
311        // 3. Delete tmp namespace directory
312        /**
313         * Delete namespace tmp directory because it's created by this coprocessor when namespace is
314         * created to make namespace default acl can be inherited by tables. The namespace data
315         * directory is deleted by DeleteNamespaceProcedure, the namespace archive directory is
316         * deleted by HFileCleaner.
317         */
318        hdfsAclHelper.deleteEmptyDir(pathHelper.getTmpNsDir(namespace));
319      }
320    }
321  }
322
323  @Override
324  public void postGrant(ObserverContext<MasterCoprocessorEnvironment> c,
325    UserPermission userPermission, boolean mergeExistingPermissions) throws IOException {
326    if (
327      !checkInitialized(
328        "grant " + userPermission + ", merge existing permissions " + mergeExistingPermissions)
329    ) {
330      return;
331    }
332    try (Table aclTable =
333      c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
334      Configuration conf = c.getEnvironment().getConfiguration();
335      String userName = userPermission.getUser();
336      switch (userPermission.getAccessScope()) {
337        case GLOBAL:
338          UserPermission perm = getUserGlobalPermission(conf, userName);
339          if (perm != null && hdfsAclHelper.containReadAction(perm)) {
340            if (!isHdfsAclSet(aclTable, userName)) {
341              // 1. Get namespaces and tables which global user acls are already synced
342              Pair<Set<String>, Set<TableName>> skipNamespaceAndTables =
343                SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName);
344              Set<String> skipNamespaces = skipNamespaceAndTables.getFirst();
345              Set<TableName> skipTables = skipNamespaceAndTables.getSecond().stream()
346                .filter(t -> !skipNamespaces.contains(t.getNamespaceAsString()))
347                .collect(Collectors.toSet());
348              // 2. Add HDFS acl(skip namespaces and tables directories whose acl is set)
349              hdfsAclHelper.grantAcl(userPermission, skipNamespaces, skipTables);
350              // 3. Record global acl is sync to HDFS
351              SnapshotScannerHDFSAclStorage.addUserGlobalHdfsAcl(aclTable, userName);
352            }
353          } else {
354            // The merged user permission doesn't contain READ, so remove user global HDFS acls if
355            // it's set
356            removeUserGlobalHdfsAcl(aclTable, userName, userPermission);
357          }
358          break;
359        case NAMESPACE:
360          String namespace = ((NamespacePermission) userPermission.getPermission()).getNamespace();
361          UserPermission nsPerm = getUserNamespacePermission(conf, userName, namespace);
362          if (nsPerm != null && hdfsAclHelper.containReadAction(nsPerm)) {
363            if (!isHdfsAclSet(aclTable, userName, namespace)) {
364              // 1. Get tables which namespace user acls are already synced
365              Set<TableName> skipTables = SnapshotScannerHDFSAclStorage
366                .getUserNamespaceAndTable(aclTable, userName).getSecond();
367              // 2. Add HDFS acl(skip tables directories whose acl is set)
368              hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), skipTables);
369            }
370            // 3. Record namespace acl is synced to HDFS
371            SnapshotScannerHDFSAclStorage.addUserNamespaceHdfsAcl(aclTable, userName, namespace);
372          } else {
373            // The merged user permission doesn't contain READ, so remove user namespace HDFS acls
374            // if it's set
375            removeUserNamespaceHdfsAcl(aclTable, userName, namespace, userPermission);
376          }
377          break;
378        case TABLE:
379          TablePermission tablePerm = (TablePermission) userPermission.getPermission();
380          if (needHandleTableHdfsAcl(tablePerm)) {
381            TableName tableName = tablePerm.getTableName();
382            UserPermission tPerm = getUserTablePermission(conf, userName, tableName);
383            if (tPerm != null && hdfsAclHelper.containReadAction(tPerm)) {
384              if (!isHdfsAclSet(aclTable, userName, tableName)) {
385                // 1. create table dirs
386                hdfsAclHelper.createTableDirectories(tableName);
387                // 2. Add HDFS acl
388                hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), new HashSet<>(0));
389              }
390              // 2. Record table acl is synced to HDFS
391              SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(aclTable, userName, tableName);
392            } else {
393              // The merged user permission doesn't contain READ, so remove user table HDFS acls if
394              // it's set
395              removeUserTableHdfsAcl(aclTable, userName, tableName, userPermission);
396            }
397          }
398          break;
399        default:
400          throw new IllegalArgumentException(
401            "Illegal user permission scope " + userPermission.getAccessScope());
402      }
403    }
404  }
405
406  @Override
407  public void postRevoke(ObserverContext<MasterCoprocessorEnvironment> c,
408    UserPermission userPermission) throws IOException {
409    if (checkInitialized("revoke " + userPermission)) {
410      try (Table aclTable =
411        c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
412        String userName = userPermission.getUser();
413        Configuration conf = c.getEnvironment().getConfiguration();
414        switch (userPermission.getAccessScope()) {
415          case GLOBAL:
416            UserPermission userGlobalPerm = getUserGlobalPermission(conf, userName);
417            if (userGlobalPerm == null || !hdfsAclHelper.containReadAction(userGlobalPerm)) {
418              removeUserGlobalHdfsAcl(aclTable, userName, userPermission);
419            }
420            break;
421          case NAMESPACE:
422            NamespacePermission nsPerm = (NamespacePermission) userPermission.getPermission();
423            UserPermission userNsPerm =
424              getUserNamespacePermission(conf, userName, nsPerm.getNamespace());
425            if (userNsPerm == null || !hdfsAclHelper.containReadAction(userNsPerm)) {
426              removeUserNamespaceHdfsAcl(aclTable, userName, nsPerm.getNamespace(), userPermission);
427            }
428            break;
429          case TABLE:
430            TablePermission tPerm = (TablePermission) userPermission.getPermission();
431            if (needHandleTableHdfsAcl(tPerm)) {
432              TableName tableName = tPerm.getTableName();
433              UserPermission userTablePerm = getUserTablePermission(conf, userName, tableName);
434              if (userTablePerm == null || !hdfsAclHelper.containReadAction(userTablePerm)) {
435                removeUserTableHdfsAcl(aclTable, userName, tableName, userPermission);
436              }
437            }
438            break;
439          default:
440            throw new IllegalArgumentException(
441              "Illegal user permission scope " + userPermission.getAccessScope());
442        }
443      }
444    }
445  }
446
447  private void removeUserGlobalHdfsAcl(Table aclTable, String userName,
448    UserPermission userPermission) throws IOException {
449    if (SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) {
450      // 1. Get namespaces and tables which global user acls are already synced
451      Pair<Set<String>, Set<TableName>> namespaceAndTable =
452        SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName);
453      Set<String> skipNamespaces = namespaceAndTable.getFirst();
454      Set<TableName> skipTables = namespaceAndTable.getSecond().stream()
455        .filter(t -> !skipNamespaces.contains(t.getNamespaceAsString()))
456        .collect(Collectors.toSet());
457      // 2. Remove user HDFS acls(skip namespaces and tables directories
458      // whose acl must be reversed)
459      hdfsAclHelper.revokeAcl(userPermission, skipNamespaces, skipTables);
460      // 3. Remove global user acl is synced to HDFS in acl table
461      SnapshotScannerHDFSAclStorage.deleteUserGlobalHdfsAcl(aclTable, userName);
462    }
463  }
464
465  private void removeUserNamespaceHdfsAcl(Table aclTable, String userName, String namespace,
466    UserPermission userPermission) throws IOException {
467    if (SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName, namespace)) {
468      if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) {
469        // 1. Get tables whose namespace user acls are already synced
470        Set<TableName> skipTables =
471          SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName).getSecond();
472        // 2. Remove user HDFS acls(skip tables directories whose acl must be reversed)
473        hdfsAclHelper.revokeAcl(userPermission, new HashSet<>(), skipTables);
474      }
475      // 3. Remove namespace user acl is synced to HDFS in acl table
476      SnapshotScannerHDFSAclStorage.deleteUserNamespaceHdfsAcl(aclTable, userName, namespace);
477    }
478  }
479
480  private void removeUserTableHdfsAcl(Table aclTable, String userName, TableName tableName,
481    UserPermission userPermission) throws IOException {
482    if (SnapshotScannerHDFSAclStorage.hasUserTableHdfsAcl(aclTable, userName, tableName)) {
483      if (
484        !SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)
485          && !SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName,
486            tableName.getNamespaceAsString())
487      ) {
488        // 1. Remove table acls
489        hdfsAclHelper.revokeAcl(userPermission, new HashSet<>(0), new HashSet<>(0));
490      }
491      // 2. Remove table user acl is synced to HDFS in acl table
492      SnapshotScannerHDFSAclStorage.deleteUserTableHdfsAcl(aclTable, userName, tableName);
493    }
494  }
495
496  private UserPermission getUserGlobalPermission(Configuration conf, String userName)
497    throws IOException {
498    List<UserPermission> permissions = PermissionStorage.getUserPermissions(conf,
499      PermissionStorage.ACL_GLOBAL_NAME, null, null, userName, true);
500    return permissions.size() > 0 ? permissions.get(0) : null;
501  }
502
503  private UserPermission getUserNamespacePermission(Configuration conf, String userName,
504    String namespace) throws IOException {
505    List<UserPermission> permissions =
506      PermissionStorage.getUserNamespacePermissions(conf, namespace, userName, true);
507    return permissions.size() > 0 ? permissions.get(0) : null;
508  }
509
510  private UserPermission getUserTablePermission(Configuration conf, String userName,
511    TableName tableName) throws IOException {
512    List<UserPermission> permissions = PermissionStorage
513      .getUserTablePermissions(conf, tableName, null, null, userName, true).stream()
514      .filter(userPermission -> hdfsAclHelper
515        .isNotFamilyOrQualifierPermission((TablePermission) userPermission.getPermission()))
516      .collect(Collectors.toList());
517    return permissions.size() > 0 ? permissions.get(0) : null;
518  }
519
520  private boolean isHdfsAclSet(Table aclTable, String userName) throws IOException {
521    return isHdfsAclSet(aclTable, userName, null, null);
522  }
523
524  private boolean isHdfsAclSet(Table aclTable, String userName, String namespace)
525    throws IOException {
526    return isHdfsAclSet(aclTable, userName, namespace, null);
527  }
528
529  private boolean isHdfsAclSet(Table aclTable, String userName, TableName tableName)
530    throws IOException {
531    return isHdfsAclSet(aclTable, userName, null, tableName);
532  }
533
534  /**
535   * Check if user global/namespace/table HDFS acls is already set
536   */
537  private boolean isHdfsAclSet(Table aclTable, String userName, String namespace,
538    TableName tableName) throws IOException {
539    boolean isSet = SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName);
540    if (namespace != null) {
541      isSet = isSet
542        || SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName, namespace);
543    }
544    if (tableName != null) {
545      isSet = isSet
546        || SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName,
547          tableName.getNamespaceAsString())
548        || SnapshotScannerHDFSAclStorage.hasUserTableHdfsAcl(aclTable, userName, tableName);
549    }
550    return isSet;
551  }
552
553  @InterfaceAudience.Private
554  boolean checkInitialized(String operation) {
555    if (initialized) {
556      if (aclTableInitialized) {
557        return true;
558      } else {
559        LOG.warn("Skip set HDFS acls because acl table is not initialized when {}", operation);
560      }
561    }
562    return false;
563  }
564
565  private boolean needHandleTableHdfsAcl(TablePermission tablePermission) throws IOException {
566    return needHandleTableHdfsAcl(tablePermission.getTableName(), "")
567      && hdfsAclHelper.isNotFamilyOrQualifierPermission(tablePermission);
568  }
569
570  private boolean needHandleTableHdfsAcl(TableName tableName, String operation) throws IOException {
571    return !tableName.isSystemTable() && checkInitialized(operation)
572      && hdfsAclHelper.isAclSyncToHdfsEnabled(masterServices.getTableDescriptors().get(tableName));
573  }
574
575  private boolean needHandleTableHdfsAcl(TableDescriptor tableDescriptor, String operation) {
576    TableName tableName = tableDescriptor.getTableName();
577    return !tableName.isSystemTable() && checkInitialized(operation)
578      && hdfsAclHelper.isAclSyncToHdfsEnabled(tableDescriptor);
579  }
580
581  private User getActiveUser(ObserverContext<?> ctx) throws IOException {
582    // for non-rpc handling, fallback to system user
583    Optional<User> optionalUser = ctx.getCaller();
584    if (optionalUser.isPresent()) {
585      return optionalUser.get();
586    }
587    return userProvider.getCurrent();
588  }
589
590  /**
591   * Remove table user access HDFS acl from namespace directory if the user has no permissions of
592   * global, ns of the table or other tables of the ns, eg: Bob has 'ns1:t1' read permission, when
593   * delete 'ns1:t1', if Bob has global read permission, '@ns1' read permission or
594   * 'ns1:other_tables' read permission, then skip remove Bob access acl in ns1Dirs, otherwise,
595   * remove Bob access acl.
596   * @param aclTable    acl table
597   * @param tableName   the name of the table
598   * @param tablesUsers table users set
599   * @return users whose access acl will be removed from the namespace of the table
600   * @throws IOException if an error occurred
601   */
602  private Set<String> filterUsersToRemoveNsAccessAcl(Table aclTable, TableName tableName,
603    Set<String> tablesUsers) throws IOException {
604    Set<String> removeUsers = new HashSet<>();
605    byte[] namespace = tableName.getNamespace();
606    for (String user : tablesUsers) {
607      List<byte[]> userEntries = SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, user);
608      boolean remove = true;
609      for (byte[] entry : userEntries) {
610        if (
611          PermissionStorage.isGlobalEntry(entry)
612            || (PermissionStorage.isNamespaceEntry(entry)
613              && Bytes.equals(PermissionStorage.fromNamespaceEntry(entry), namespace))
614            || (PermissionStorage.isTableEntry(entry) && !Bytes.equals(tableName.getName(), entry)
615              && Bytes.equals(TableName.valueOf(entry).getNamespace(), namespace))
616        ) {
617          remove = false;
618          break;
619        }
620      }
621      if (remove) {
622        removeUsers.add(user);
623      }
624    }
625    return removeUsers;
626  }
627
628  static final class SnapshotScannerHDFSAclStorage {
629    /**
630     * Add a new CF in HBase acl table to record if the HBase read permission is synchronized to
631     * related hfiles. The record has two usages: 1. check if we need to remove HDFS acls for a
632     * grant without READ permission(eg: grant user table read permission and then grant user table
633     * write permission without merging the existing permissions, in this case, need to remove HDFS
634     * acls); 2. skip some HDFS acl sync because it may be already set(eg: grant user table read
635     * permission and then grant user ns read permission; grant user table read permission and then
636     * grant user table write permission with merging the existing permissions).
637     */
638    static final byte[] HDFS_ACL_FAMILY = Bytes.toBytes("m");
639    // The value 'R' has no specific meaning, if cell value is not null, it means that the user HDFS
640    // acls is set to hfiles.
641    private static final byte[] HDFS_ACL_VALUE = Bytes.toBytes("R");
642
643    static void addUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
644      addUserEntry(aclTable, user, PermissionStorage.ACL_GLOBAL_NAME);
645    }
646
647    static void addUserNamespaceHdfsAcl(Table aclTable, String user, String namespace)
648      throws IOException {
649      addUserEntry(aclTable, user, Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
650    }
651
652    static void addUserTableHdfsAcl(Connection connection, Set<String> users, TableName tableName)
653      throws IOException {
654      try (Table aclTable = connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
655        for (String user : users) {
656          addUserTableHdfsAcl(aclTable, user, tableName);
657        }
658      }
659    }
660
661    static void addUserTableHdfsAcl(Connection connection, String user, TableName tableName)
662      throws IOException {
663      try (Table aclTable = connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
664        addUserTableHdfsAcl(aclTable, user, tableName);
665      }
666    }
667
668    static void addUserTableHdfsAcl(Table aclTable, String user, TableName tableName)
669      throws IOException {
670      addUserEntry(aclTable, user, tableName.getName());
671    }
672
673    private static void addUserEntry(Table t, String user, byte[] entry) throws IOException {
674      Put p = new Put(entry);
675      p.addColumn(HDFS_ACL_FAMILY, Bytes.toBytes(user), HDFS_ACL_VALUE);
676      t.put(p);
677    }
678
679    static void deleteUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
680      deleteUserEntry(aclTable, user, PermissionStorage.ACL_GLOBAL_NAME);
681    }
682
683    static void deleteUserNamespaceHdfsAcl(Table aclTable, String user, String namespace)
684      throws IOException {
685      deleteUserEntry(aclTable, user, Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
686    }
687
688    static void deleteUserTableHdfsAcl(Table aclTable, String user, TableName tableName)
689      throws IOException {
690      deleteUserEntry(aclTable, user, tableName.getName());
691    }
692
693    static void deleteUserTableHdfsAcl(Connection connection, Set<String> users,
694      TableName tableName) throws IOException {
695      try (Table aclTable = connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
696        for (String user : users) {
697          deleteUserTableHdfsAcl(aclTable, user, tableName);
698        }
699      }
700    }
701
702    private static void deleteUserEntry(Table aclTable, String user, byte[] entry)
703      throws IOException {
704      Delete delete = new Delete(entry);
705      delete.addColumns(HDFS_ACL_FAMILY, Bytes.toBytes(user));
706      aclTable.delete(delete);
707    }
708
709    static void deleteNamespaceHdfsAcl(Connection connection, String namespace) throws IOException {
710      try (Table aclTable = connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
711        deleteEntry(aclTable, Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
712      }
713    }
714
715    static void deleteTableHdfsAcl(Table aclTable, TableName tableName) throws IOException {
716      deleteEntry(aclTable, tableName.getName());
717    }
718
719    private static void deleteEntry(Table aclTable, byte[] entry) throws IOException {
720      Delete delete = new Delete(entry);
721      delete.addFamily(HDFS_ACL_FAMILY);
722      aclTable.delete(delete);
723    }
724
725    static Set<String> getTableUsers(Table aclTable, TableName tableName) throws IOException {
726      return getEntryUsers(aclTable, tableName.getName());
727    }
728
729    private static Set<String> getEntryUsers(Table aclTable, byte[] entry) throws IOException {
730      Set<String> users = new HashSet<>();
731      Get get = new Get(entry);
732      get.addFamily(HDFS_ACL_FAMILY);
733      Result result = aclTable.get(get);
734      List<Cell> cells = result.listCells();
735      if (cells != null) {
736        for (Cell cell : cells) {
737          if (cell != null) {
738            users.add(Bytes.toString(CellUtil.cloneQualifier(cell)));
739          }
740        }
741      }
742      return users;
743    }
744
745    static Pair<Set<String>, Set<TableName>> getUserNamespaceAndTable(Table aclTable,
746      String userName) throws IOException {
747      Set<String> namespaces = new HashSet<>();
748      Set<TableName> tables = new HashSet<>();
749      List<byte[]> userEntries = getUserEntries(aclTable, userName);
750      for (byte[] entry : userEntries) {
751        if (PermissionStorage.isNamespaceEntry(entry)) {
752          namespaces.add(Bytes.toString(PermissionStorage.fromNamespaceEntry(entry)));
753        } else if (PermissionStorage.isTableEntry(entry)) {
754          tables.add(TableName.valueOf(entry));
755        }
756      }
757      return new Pair<>(namespaces, tables);
758    }
759
760    static List<byte[]> getUserEntries(Table aclTable, String userName) throws IOException {
761      Scan scan = new Scan();
762      scan.addColumn(HDFS_ACL_FAMILY, Bytes.toBytes(userName));
763      ResultScanner scanner = aclTable.getScanner(scan);
764      List<byte[]> entry = new ArrayList<>();
765      for (Result result : scanner) {
766        if (result != null && result.getRow() != null) {
767          entry.add(result.getRow());
768        }
769      }
770      return entry;
771    }
772
773    static boolean hasUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
774      return hasUserEntry(aclTable, user, PermissionStorage.ACL_GLOBAL_NAME);
775    }
776
777    static boolean hasUserNamespaceHdfsAcl(Table aclTable, String user, String namespace)
778      throws IOException {
779      return hasUserEntry(aclTable, user,
780        Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
781    }
782
783    static boolean hasUserTableHdfsAcl(Table aclTable, String user, TableName tableName)
784      throws IOException {
785      return hasUserEntry(aclTable, user, tableName.getName());
786    }
787
788    private static boolean hasUserEntry(Table aclTable, String userName, byte[] entry)
789      throws IOException {
790      Get get = new Get(entry);
791      get.addColumn(HDFS_ACL_FAMILY, Bytes.toBytes(userName));
792      return aclTable.exists(get);
793    }
794  }
795}