001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.access;
019
020import java.io.ByteArrayInputStream;
021import java.io.DataInput;
022import java.io.DataInputStream;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.TreeMap;
031import java.util.TreeSet;
032import org.apache.commons.lang3.StringUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.AuthUtil;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.Cell.Type;
037import org.apache.hadoop.hbase.CellBuilderFactory;
038import org.apache.hadoop.hbase.CellBuilderType;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.CompareOperator;
041import org.apache.hadoop.hbase.NamespaceDescriptor;
042import org.apache.hadoop.hbase.PrivateCellUtil;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.Tag;
045import org.apache.hadoop.hbase.TagType;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.ConnectionFactory;
048import org.apache.hadoop.hbase.client.Delete;
049import org.apache.hadoop.hbase.client.Get;
050import org.apache.hadoop.hbase.client.Put;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.ResultScanner;
053import org.apache.hadoop.hbase.client.Scan;
054import org.apache.hadoop.hbase.client.Table;
055import org.apache.hadoop.hbase.client.TableDescriptor;
056import org.apache.hadoop.hbase.exceptions.DeserializationException;
057import org.apache.hadoop.hbase.filter.QualifierFilter;
058import org.apache.hadoop.hbase.filter.RegexStringComparator;
059import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
060import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
061import org.apache.hadoop.hbase.regionserver.InternalScanner;
062import org.apache.hadoop.hbase.regionserver.Region;
063import org.apache.hadoop.hbase.security.User;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.Pair;
066import org.apache.hadoop.io.Text;
067import org.apache.hadoop.io.Writable;
068import org.apache.hadoop.io.WritableFactories;
069import org.apache.hadoop.io.WritableUtils;
070import org.apache.yetus.audience.InterfaceAudience;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
075import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
076import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
077
078/**
079 * Maintains lists of permission grants to users and groups to allow for authorization checks by
080 * {@link AccessController}.
081 * <p>
082 * Access control lists are stored in an "internal" metadata table named {@code _acl_}. Each table's
083 * permission grants are stored as a separate row, keyed by the table name. KeyValues for
084 * permissions assignments are stored in one of the formats:
085 *
086 * <pre>
087 * Key                      Desc
088 * --------                 --------
089 * user                     table level permissions for a user [R=read, W=write]
090 * group                    table level permissions for a group
091 * user,family              column family level permissions for a user
092 * group,family             column family level permissions for a group
093 * user,family,qualifier    column qualifier level permissions for a user
094 * group,family,qualifier   column qualifier level permissions for a group
095 * </pre>
096 * <p>
097 * All values are encoded as byte arrays containing the codes from the
098 * org.apache.hadoop.hbase.security.access.TablePermission.Action enum.
099 * </p>
100 */
101@InterfaceAudience.Private
102public final class PermissionStorage {
103  /** Internal storage table for access control lists */
104  public static final TableName ACL_TABLE_NAME =
105    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl");
106  public static final byte[] ACL_GLOBAL_NAME = ACL_TABLE_NAME.getName();
107  /** Column family used to store ACL grants */
108  public static final String ACL_LIST_FAMILY_STR = "l";
109  public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
110  /** KV tag to store per cell access control lists */
111  public static final byte ACL_TAG_TYPE = TagType.ACL_TAG_TYPE;
112
113  public static final char NAMESPACE_PREFIX = '@';
114
115  /**
116   * Delimiter to separate user, column family, and qualifier in _acl_ table info: column keys
117   */
118  public static final char ACL_KEY_DELIMITER = ',';
119
120  private static final Logger LOG = LoggerFactory.getLogger(PermissionStorage.class);
121
122  private PermissionStorage() {
123  }
124
125  /**
126   * Stores a new user permission grant in the access control lists table.
127   * @param conf     the configuration
128   * @param userPerm the details of the permission to be granted
129   * @param t        acl table instance. It is closed upon method return.
130   * @throws IOException in the case of an error accessing the metadata table
131   */
132  public static void addUserPermission(Configuration conf, UserPermission userPerm, Table t,
133    boolean mergeExistingPermissions) throws IOException {
134    Permission permission = userPerm.getPermission();
135    Permission.Action[] actions = permission.getActions();
136    byte[] rowKey = userPermissionRowKey(permission);
137    Put p = new Put(rowKey);
138    byte[] key = userPermissionKey(userPerm);
139
140    if ((actions == null) || (actions.length == 0)) {
141      String msg = "No actions associated with user '" + userPerm.getUser() + "'";
142      LOG.warn(msg);
143      throw new IOException(msg);
144    }
145
146    Set<Permission.Action> actionSet = new TreeSet<Permission.Action>();
147    if (mergeExistingPermissions) {
148      List<UserPermission> perms = getUserPermissions(conf, rowKey, null, null, null, false);
149      UserPermission currentPerm = null;
150      for (UserPermission perm : perms) {
151        if (userPerm.equalsExceptActions(perm)) {
152          currentPerm = perm;
153          break;
154        }
155      }
156
157      if (currentPerm != null && currentPerm.getPermission().getActions() != null) {
158        actionSet.addAll(Arrays.asList(currentPerm.getPermission().getActions()));
159      }
160    }
161
162    // merge current action with new action.
163    actionSet.addAll(Arrays.asList(actions));
164
165    // serialize to byte array.
166    byte[] value = new byte[actionSet.size()];
167    int index = 0;
168    for (Permission.Action action : actionSet) {
169      value[index++] = action.code();
170    }
171    p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(p.getRow())
172      .setFamily(ACL_LIST_FAMILY).setQualifier(key).setTimestamp(p.getTimestamp()).setType(Type.Put)
173      .setValue(value).build());
174    if (LOG.isDebugEnabled()) {
175      LOG.debug("Writing permission with rowKey " + Bytes.toString(rowKey) + " "
176        + Bytes.toString(key) + ": " + Bytes.toStringBinary(value));
177    }
178    try {
179      t.put(p);
180    } finally {
181      t.close();
182    }
183  }
184
185  static void addUserPermission(Configuration conf, UserPermission userPerm, Table t)
186    throws IOException {
187    addUserPermission(conf, userPerm, t, false);
188  }
189
190  /**
191   * Removes a previously granted permission from the stored access control lists. The
192   * {@link TablePermission} being removed must exactly match what is stored -- no wildcard matching
193   * is attempted. Ie, if user "bob" has been granted "READ" access to the "data" table, but only to
194   * column family plus qualifier "info:colA", then trying to call this method with only user "bob"
195   * and the table name "data" (but without specifying the column qualifier "info:colA") will have
196   * no effect.
197   * @param conf     the configuration
198   * @param userPerm the details of the permission to be revoked
199   * @param t        acl table
200   * @throws IOException if there is an error accessing the metadata table
201   */
202  public static void removeUserPermission(Configuration conf, UserPermission userPerm, Table t)
203    throws IOException {
204    if (
205      null == userPerm.getPermission().getActions()
206        || userPerm.getPermission().getActions().length == 0
207    ) {
208      removePermissionRecord(conf, userPerm, t);
209    } else {
210      // Get all the global user permissions from the acl table
211      List<UserPermission> permsList = getUserPermissions(conf,
212        userPermissionRowKey(userPerm.getPermission()), null, null, null, false);
213      List<Permission.Action> remainingActions = new ArrayList<>();
214      List<Permission.Action> dropActions = Arrays.asList(userPerm.getPermission().getActions());
215      for (UserPermission perm : permsList) {
216        // Find the user and remove only the requested permissions
217        if (perm.getUser().equals(userPerm.getUser())) {
218          for (Permission.Action oldAction : perm.getPermission().getActions()) {
219            if (!dropActions.contains(oldAction)) {
220              remainingActions.add(oldAction);
221            }
222          }
223          if (!remainingActions.isEmpty()) {
224            perm.getPermission()
225              .setActions(remainingActions.toArray(new Permission.Action[remainingActions.size()]));
226            addUserPermission(conf, perm, t);
227          } else {
228            removePermissionRecord(conf, userPerm, t);
229          }
230          break;
231        }
232      }
233    }
234    if (LOG.isDebugEnabled()) {
235      LOG.debug("Removed permission " + userPerm.toString());
236    }
237  }
238
239  private static void removePermissionRecord(Configuration conf, UserPermission userPerm, Table t)
240    throws IOException {
241    Delete d = new Delete(userPermissionRowKey(userPerm.getPermission()));
242    d.addColumns(ACL_LIST_FAMILY, userPermissionKey(userPerm));
243    try {
244      t.delete(d);
245    } finally {
246      t.close();
247    }
248  }
249
250  /**
251   * Remove specified table from the _acl_ table.
252   */
253  static void removeTablePermissions(Configuration conf, TableName tableName, Table t)
254    throws IOException {
255    Delete d = new Delete(tableName.getName());
256    d.addFamily(ACL_LIST_FAMILY);
257
258    if (LOG.isDebugEnabled()) {
259      LOG.debug("Removing permissions of removed table " + tableName);
260    }
261    try {
262      t.delete(d);
263    } finally {
264      t.close();
265    }
266  }
267
268  /**
269   * Remove specified namespace from the acl table.
270   */
271  static void removeNamespacePermissions(Configuration conf, String namespace, Table t)
272    throws IOException {
273    Delete d = new Delete(Bytes.toBytes(toNamespaceEntry(namespace)));
274    d.addFamily(ACL_LIST_FAMILY);
275    if (LOG.isDebugEnabled()) {
276      LOG.debug("Removing permissions of removed namespace " + namespace);
277    }
278
279    try {
280      t.delete(d);
281    } finally {
282      t.close();
283    }
284  }
285
286  static private void removeTablePermissions(TableName tableName, byte[] column, Table table,
287    boolean closeTable) throws IOException {
288    Scan scan = new Scan();
289    scan.addFamily(ACL_LIST_FAMILY);
290
291    String columnName = Bytes.toString(column);
292    scan.setFilter(new QualifierFilter(CompareOperator.EQUAL,
293      new RegexStringComparator(String.format("(%s%s%s)|(%s%s)$", ACL_KEY_DELIMITER, columnName,
294        ACL_KEY_DELIMITER, ACL_KEY_DELIMITER, columnName))));
295
296    Set<byte[]> qualifierSet = new TreeSet<>(Bytes.BYTES_COMPARATOR);
297    ResultScanner scanner = null;
298    try {
299      scanner = table.getScanner(scan);
300      for (Result res : scanner) {
301        for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
302          qualifierSet.add(q);
303        }
304      }
305
306      if (qualifierSet.size() > 0) {
307        Delete d = new Delete(tableName.getName());
308        for (byte[] qualifier : qualifierSet) {
309          d.addColumns(ACL_LIST_FAMILY, qualifier);
310        }
311        table.delete(d);
312      }
313    } finally {
314      if (scanner != null) {
315        scanner.close();
316      }
317      if (closeTable) {
318        table.close();
319      }
320    }
321  }
322
323  /**
324   * Remove specified table column from the acl table.
325   */
326  static void removeTablePermissions(Configuration conf, TableName tableName, byte[] column,
327    Table t) throws IOException {
328    if (LOG.isDebugEnabled()) {
329      LOG.debug("Removing permissions of removed column " + Bytes.toString(column) + " from table "
330        + tableName);
331    }
332    removeTablePermissions(tableName, column, t, true);
333  }
334
335  static byte[] userPermissionRowKey(Permission permission) {
336    byte[] row;
337    if (permission instanceof TablePermission) {
338      TablePermission tablePerm = (TablePermission) permission;
339      row = tablePerm.getTableName().getName();
340    } else if (permission instanceof NamespacePermission) {
341      NamespacePermission nsPerm = (NamespacePermission) permission;
342      row = Bytes.toBytes(toNamespaceEntry(nsPerm.getNamespace()));
343    } else {
344      // permission instanceof TablePermission
345      row = ACL_GLOBAL_NAME;
346    }
347    return row;
348  }
349
350  /**
351   * Build qualifier key from user permission: username username,family username,family,qualifier
352   */
353  static byte[] userPermissionKey(UserPermission permission) {
354    byte[] key = Bytes.toBytes(permission.getUser());
355    byte[] qualifier = null;
356    byte[] family = null;
357    if (permission.getPermission().getAccessScope() == Permission.Scope.TABLE) {
358      TablePermission tablePermission = (TablePermission) permission.getPermission();
359      family = tablePermission.getFamily();
360      qualifier = tablePermission.getQualifier();
361    }
362
363    if (family != null && family.length > 0) {
364      key = Bytes.add(key, Bytes.add(new byte[] { ACL_KEY_DELIMITER }, family));
365      if (qualifier != null && qualifier.length > 0) {
366        key = Bytes.add(key, Bytes.add(new byte[] { ACL_KEY_DELIMITER }, qualifier));
367      }
368    }
369
370    return key;
371  }
372
373  /**
374   * Returns {@code true} if the given region is part of the {@code _acl_} metadata table.
375   */
376  static boolean isAclRegion(Region region) {
377    return ACL_TABLE_NAME.equals(region.getTableDescriptor().getTableName());
378  }
379
380  /**
381   * Returns {@code true} if the given table is {@code _acl_} metadata table.
382   */
383  static boolean isAclTable(TableDescriptor desc) {
384    return ACL_TABLE_NAME.equals(desc.getTableName());
385  }
386
387  /**
388   * Loads all of the permission grants stored in a region of the {@code _acl_} table.
389   * @param aclRegion the acl region
390   * @return a map of the permissions for this table.
391   * @throws IOException if an error occurs
392   */
393  static Map<byte[], ListMultimap<String, UserPermission>> loadAll(Region aclRegion)
394    throws IOException {
395    if (!isAclRegion(aclRegion)) {
396      throw new IOException("Can only load permissions from " + ACL_TABLE_NAME);
397    }
398
399    Map<byte[], ListMultimap<String, UserPermission>> allPerms =
400      new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
401
402    // do a full scan of _acl_ table
403
404    Scan scan = new Scan();
405    scan.addFamily(ACL_LIST_FAMILY);
406
407    InternalScanner iScanner = null;
408    try {
409      iScanner = aclRegion.getScanner(scan);
410
411      while (true) {
412        List<Cell> row = new ArrayList<>();
413
414        boolean hasNext = iScanner.next(row);
415        ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
416        byte[] entry = null;
417        for (Cell kv : row) {
418          if (entry == null) {
419            entry = CellUtil.cloneRow(kv);
420          }
421          Pair<String, Permission> permissionsOfUserOnTable =
422            parsePermissionRecord(entry, kv, null, null, false, null);
423          if (permissionsOfUserOnTable != null) {
424            String username = permissionsOfUserOnTable.getFirst();
425            Permission permission = permissionsOfUserOnTable.getSecond();
426            perms.put(username, new UserPermission(username, permission));
427          }
428        }
429        if (entry != null) {
430          allPerms.put(entry, perms);
431        }
432        if (!hasNext) {
433          break;
434        }
435      }
436    } finally {
437      if (iScanner != null) {
438        iScanner.close();
439      }
440    }
441
442    return allPerms;
443  }
444
445  /**
446   * Load all permissions from the region server holding {@code _acl_}, primarily intended for
447   * testing purposes.
448   */
449  static Map<byte[], ListMultimap<String, UserPermission>> loadAll(Configuration conf)
450    throws IOException {
451    Map<byte[], ListMultimap<String, UserPermission>> allPerms =
452      new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
453
454    // do a full scan of _acl_, filtering on only first table region rows
455
456    Scan scan = new Scan();
457    scan.addFamily(ACL_LIST_FAMILY);
458
459    ResultScanner scanner = null;
460    // TODO: Pass in a Connection rather than create one each time.
461    try (Connection connection = ConnectionFactory.createConnection(conf)) {
462      try (Table table = connection.getTable(ACL_TABLE_NAME)) {
463        scanner = table.getScanner(scan);
464        try {
465          for (Result row : scanner) {
466            ListMultimap<String, UserPermission> resultPerms =
467              parsePermissions(row.getRow(), row, null, null, null, false);
468            allPerms.put(row.getRow(), resultPerms);
469          }
470        } finally {
471          if (scanner != null) {
472            scanner.close();
473          }
474        }
475      }
476    }
477
478    return allPerms;
479  }
480
481  public static ListMultimap<String, UserPermission> getTablePermissions(Configuration conf,
482    TableName tableName) throws IOException {
483    return getPermissions(conf, tableName != null ? tableName.getName() : null, null, null, null,
484      null, false);
485  }
486
487  public static ListMultimap<String, UserPermission> getNamespacePermissions(Configuration conf,
488    String namespace) throws IOException {
489    return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), null, null, null, null,
490      false);
491  }
492
493  public static ListMultimap<String, UserPermission> getGlobalPermissions(Configuration conf)
494    throws IOException {
495    return getPermissions(conf, null, null, null, null, null, false);
496  }
497
498  /**
499   * Reads user permission assignments stored in the <code>l:</code> column family of the first
500   * table row in <code>_acl_</code>.
501   * <p>
502   * See {@link PermissionStorage class documentation} for the key structure used for storage.
503   * </p>
504   */
505  static ListMultimap<String, UserPermission> getPermissions(Configuration conf, byte[] entryName,
506    Table t, byte[] cf, byte[] cq, String user, boolean hasFilterUser) throws IOException {
507    if (entryName == null) {
508      entryName = ACL_GLOBAL_NAME;
509    }
510    // for normal user tables, we just read the table row from _acl_
511    ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
512    Get get = new Get(entryName);
513    get.addFamily(ACL_LIST_FAMILY);
514    Result row = null;
515    if (t == null) {
516      try (Connection connection = ConnectionFactory.createConnection(conf)) {
517        try (Table table = connection.getTable(ACL_TABLE_NAME)) {
518          row = table.get(get);
519        }
520      }
521    } else {
522      row = t.get(get);
523    }
524    if (!row.isEmpty()) {
525      perms = parsePermissions(entryName, row, cf, cq, user, hasFilterUser);
526    } else {
527      LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
528        + Bytes.toString(entryName));
529    }
530
531    return perms;
532  }
533
534  /**
535   * Returns the currently granted permissions for a given table as the specified user plus
536   * associated permissions.
537   */
538  public static List<UserPermission> getUserTablePermissions(Configuration conf,
539    TableName tableName, byte[] cf, byte[] cq, String userName, boolean hasFilterUser)
540    throws IOException {
541    return getUserPermissions(conf, tableName == null ? null : tableName.getName(), cf, cq,
542      userName, hasFilterUser);
543  }
544
545  /**
546   * Returns the currently granted permissions for a given namespace as the specified user plus
547   * associated permissions.
548   */
549  public static List<UserPermission> getUserNamespacePermissions(Configuration conf,
550    String namespace, String user, boolean hasFilterUser) throws IOException {
551    return getUserPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), null, null, user,
552      hasFilterUser);
553  }
554
555  /**
556   * Returns the currently granted permissions for a given table/namespace with associated
557   * permissions based on the specified column family, column qualifier and user name.
558   * @param conf          the configuration
559   * @param entryName     Table name or the namespace
560   * @param cf            Column family
561   * @param cq            Column qualifier
562   * @param user          User name to be filtered from permission as requested
563   * @param hasFilterUser true if filter user is provided, otherwise false.
564   * @return List of UserPermissions
565   * @throws IOException on failure
566   */
567  public static List<UserPermission> getUserPermissions(Configuration conf, byte[] entryName,
568    byte[] cf, byte[] cq, String user, boolean hasFilterUser) throws IOException {
569    ListMultimap<String, UserPermission> allPerms =
570      getPermissions(conf, entryName, null, cf, cq, user, hasFilterUser);
571    List<UserPermission> perms = new ArrayList<>();
572    for (Map.Entry<String, UserPermission> entry : allPerms.entries()) {
573      perms.add(entry.getValue());
574    }
575    return perms;
576  }
577
578  /**
579   * Parse and filter permission based on the specified column family, column qualifier and user
580   * name.
581   */
582  private static ListMultimap<String, UserPermission> parsePermissions(byte[] entryName,
583    Result result, byte[] cf, byte[] cq, String user, boolean hasFilterUser) {
584    ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
585    if (result != null && result.size() > 0) {
586      for (Cell kv : result.rawCells()) {
587        Pair<String, Permission> permissionsOfUserOnTable =
588          parsePermissionRecord(entryName, kv, cf, cq, hasFilterUser, user);
589
590        if (permissionsOfUserOnTable != null) {
591          String username = permissionsOfUserOnTable.getFirst();
592          Permission permission = permissionsOfUserOnTable.getSecond();
593          perms.put(username, new UserPermission(username, permission));
594        }
595      }
596    }
597    return perms;
598  }
599
600  private static Pair<String, Permission> parsePermissionRecord(byte[] entryName, Cell kv,
601    byte[] cf, byte[] cq, boolean filterPerms, String filterUser) {
602    // return X given a set of permissions encoded in the permissionRecord kv.
603    byte[] family = CellUtil.cloneFamily(kv);
604    if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
605      return null;
606    }
607
608    byte[] key = CellUtil.cloneQualifier(kv);
609    byte[] value = CellUtil.cloneValue(kv);
610    if (LOG.isDebugEnabled()) {
611      LOG.debug("Read acl: entry[" + Bytes.toStringBinary(entryName) + "], kv ["
612        + Bytes.toStringBinary(key) + ": " + Bytes.toStringBinary(value) + "]");
613    }
614
615    // check for a column family appended to the key
616    // TODO: avoid the string conversion to make this more efficient
617    String username = Bytes.toString(key);
618
619    // Retrieve group list for the filterUser if cell key is a group.
620    // Group list is not required when filterUser itself a group
621    List<String> filterUserGroups = null;
622    if (filterPerms) {
623      if (
624        username.charAt(0) == '@' && !StringUtils.isEmpty(filterUser) && filterUser.charAt(0) != '@'
625      ) {
626        filterUserGroups = AccessChecker.getUserGroups(filterUser);
627      }
628    }
629
630    // Handle namespace entry
631    if (isNamespaceEntry(entryName)) {
632      // Filter the permissions cell record if client query
633      if (filterPerms && !validateFilterUser(username, filterUser, filterUserGroups)) {
634        return null;
635      }
636
637      return new Pair<>(username, Permission
638        .newBuilder(Bytes.toString(fromNamespaceEntry(entryName))).withActionCodes(value).build());
639    }
640
641    // Handle global entry
642    if (isGlobalEntry(entryName)) {
643      // Filter the permissions cell record if client query
644      if (filterPerms && !validateFilterUser(username, filterUser, filterUserGroups)) {
645        return null;
646      }
647
648      return new Pair<>(username, Permission.newBuilder().withActionCodes(value).build());
649    }
650
651    // Handle table entry
652    int idx = username.indexOf(ACL_KEY_DELIMITER);
653    byte[] permFamily = null;
654    byte[] permQualifier = null;
655    if (idx > 0 && idx < username.length() - 1) {
656      String remainder = username.substring(idx + 1);
657      username = username.substring(0, idx);
658      idx = remainder.indexOf(ACL_KEY_DELIMITER);
659      if (idx > 0 && idx < remainder.length() - 1) {
660        permFamily = Bytes.toBytes(remainder.substring(0, idx));
661        permQualifier = Bytes.toBytes(remainder.substring(idx + 1));
662      } else {
663        permFamily = Bytes.toBytes(remainder);
664      }
665    }
666
667    // Filter the permissions cell record if client query
668    if (filterPerms) {
669      // ACL table contain 3 types of cell key entries; hbase:Acl, namespace and table. So to filter
670      // the permission cell records additional validations are required at CF, CQ and username.
671      // Here we can proceed based on client input whether it contain filterUser.
672      // Validate the filterUser when specified
673      if (filterUser != null && !validateFilterUser(username, filterUser, filterUserGroups)) {
674        return null;
675      }
676      if (!validateCFAndCQ(permFamily, cf, permQualifier, cq)) {
677        return null;
678      }
679    }
680
681    return new Pair<>(username, Permission.newBuilder(TableName.valueOf(entryName))
682      .withFamily(permFamily).withQualifier(permQualifier).withActionCodes(value).build());
683  }
684
685  /*
686   * Validate the cell key with the client filterUser if specified in the query input. 1. If cell
687   * key (username) is not a group then check whether client filterUser is equal to username 2. If
688   * cell key (username) is a group then check whether client filterUser belongs to the cell key
689   * group (username) 3. In case when both filterUser and username are group names then cell will be
690   * filtered if not equal.
691   */
692  private static boolean validateFilterUser(String username, String filterUser,
693    List<String> filterUserGroups) {
694    if (filterUserGroups == null) {
695      // Validate user name or group names whether equal
696      if (filterUser.equals(username)) {
697        return true;
698      }
699    } else {
700      // Check whether filter user belongs to the cell key group.
701      return filterUserGroups.contains(username.substring(1));
702    }
703    return false;
704  }
705
706  /*
707   * Validate the cell with client CF and CQ if specified in the query input. 1. If CF is NULL, then
708   * no need of further validation, result should include all CF and CQ. 2. IF CF specified and
709   * equal then validation required at CQ level if CF specified in client input, otherwise return
710   * all CQ records.
711   */
712  private static boolean validateCFAndCQ(byte[] permFamily, byte[] cf, byte[] permQualifier,
713    byte[] cq) {
714    boolean include = true;
715    if (cf != null) {
716      if (Bytes.equals(cf, permFamily)) {
717        if (cq != null && !Bytes.equals(cq, permQualifier)) {
718          // if CQ specified and didn't match then ignore this cell
719          include = false;
720        }
721      } else {
722        // if CF specified and didn't match then ignore this cell
723        include = false;
724      }
725    }
726    return include;
727  }
728
729  /**
730   * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances and returns the
731   * resulting byte array. Writes a set of permission [user: table permission]
732   */
733  public static byte[] writePermissionsAsBytes(ListMultimap<String, UserPermission> perms,
734    Configuration conf) {
735    return ProtobufUtil
736      .prependPBMagic(AccessControlUtil.toUserTablePermissions(perms).toByteArray());
737  }
738
739  // This is part of the old HbaseObjectWritableFor96Migration.
740  private static final int LIST_CODE = 61;
741
742  private static final int WRITABLE_CODE = 14;
743
744  private static final int WRITABLE_NOT_ENCODED = 0;
745
746  private static List<Permission> readWritableUserPermission(DataInput in, Configuration conf)
747    throws IOException, ClassNotFoundException {
748    assert WritableUtils.readVInt(in) == LIST_CODE;
749    int length = in.readInt();
750    List<Permission> list = new ArrayList<>(length);
751    for (int i = 0; i < length; i++) {
752      assert WritableUtils.readVInt(in) == WRITABLE_CODE;
753      assert WritableUtils.readVInt(in) == WRITABLE_NOT_ENCODED;
754      String className = Text.readString(in);
755      Class<? extends Writable> clazz = conf.getClassByName(className).asSubclass(Writable.class);
756      Writable instance = WritableFactories.newInstance(clazz, conf);
757      instance.readFields(in);
758      list.add((Permission) instance);
759    }
760    return list;
761  }
762
763  public static ListMultimap<String, UserPermission> readUserPermission(byte[] data,
764    Configuration conf) throws DeserializationException {
765    if (ProtobufUtil.isPBMagicPrefix(data)) {
766      int pblen = ProtobufUtil.lengthOfPBMagic();
767      try {
768        AccessControlProtos.UsersAndPermissions.Builder builder =
769          AccessControlProtos.UsersAndPermissions.newBuilder();
770        ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
771        return AccessControlUtil.toUserPermission(builder.build());
772      } catch (IOException e) {
773        throw new DeserializationException(e);
774      }
775    } else {
776      // TODO: We have to re-write non-PB data as PB encoded. Otherwise we will carry old Writables
777      // forever (here and a couple of other places).
778      ListMultimap<String, UserPermission> userPermission = ArrayListMultimap.create();
779      try {
780        DataInput in = new DataInputStream(new ByteArrayInputStream(data));
781        int length = in.readInt();
782        for (int i = 0; i < length; i++) {
783          String user = Text.readString(in);
784          List<Permission> perms = readWritableUserPermission(in, conf);
785          for (Permission p : perms) {
786            userPermission.put(user, new UserPermission(user, p));
787          }
788        }
789      } catch (IOException | ClassNotFoundException e) {
790        throw new DeserializationException(e);
791      }
792      return userPermission;
793    }
794  }
795
796  public static ListMultimap<String, Permission> readPermissions(byte[] data, Configuration conf)
797    throws DeserializationException {
798    if (ProtobufUtil.isPBMagicPrefix(data)) {
799      int pblen = ProtobufUtil.lengthOfPBMagic();
800      try {
801        AccessControlProtos.UsersAndPermissions.Builder builder =
802          AccessControlProtos.UsersAndPermissions.newBuilder();
803        ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
804        return AccessControlUtil.toPermission(builder.build());
805      } catch (IOException e) {
806        throw new DeserializationException(e);
807      }
808    } else {
809      // TODO: We have to re-write non-PB data as PB encoded. Otherwise we will carry old Writables
810      // forever (here and a couple of other places).
811      ListMultimap<String, Permission> perms = ArrayListMultimap.create();
812      try {
813        DataInput in = new DataInputStream(new ByteArrayInputStream(data));
814        int length = in.readInt();
815        for (int i = 0; i < length; i++) {
816          String user = Text.readString(in);
817          perms.putAll(user, readWritableUserPermission(in, conf));
818        }
819      } catch (IOException | ClassNotFoundException e) {
820        throw new DeserializationException(e);
821      }
822      return perms;
823    }
824  }
825
826  public static boolean isGlobalEntry(byte[] entryName) {
827    return Bytes.equals(entryName, ACL_GLOBAL_NAME);
828  }
829
830  public static boolean isNamespaceEntry(String entryName) {
831    return isNamespaceEntry(Bytes.toBytes(entryName));
832  }
833
834  public static boolean isNamespaceEntry(byte[] entryName) {
835    return entryName != null && entryName.length != 0 && entryName[0] == NAMESPACE_PREFIX;
836  }
837
838  public static boolean isTableEntry(byte[] entryName) {
839    return !isNamespaceEntry(entryName) && !isGlobalEntry(entryName) && entryName != null;
840  }
841
842  public static String toNamespaceEntry(String namespace) {
843    return NAMESPACE_PREFIX + namespace;
844  }
845
846  public static String fromNamespaceEntry(String namespace) {
847    if (namespace.charAt(0) != NAMESPACE_PREFIX) {
848      throw new IllegalArgumentException("Argument is not a valid namespace entry");
849    }
850    return namespace.substring(1);
851  }
852
853  public static byte[] toNamespaceEntry(byte[] namespace) {
854    byte[] ret = new byte[namespace.length + 1];
855    ret[0] = NAMESPACE_PREFIX;
856    System.arraycopy(namespace, 0, ret, 1, namespace.length);
857    return ret;
858  }
859
860  public static byte[] fromNamespaceEntry(byte[] namespace) {
861    if (namespace[0] != NAMESPACE_PREFIX) {
862      throw new IllegalArgumentException(
863        "Argument is not a valid namespace entry: " + Bytes.toString(namespace));
864    }
865    return Arrays.copyOfRange(namespace, 1, namespace.length);
866  }
867
868  public static List<Permission> getCellPermissionsForUser(User user, Cell cell)
869    throws IOException {
870    // Save an object allocation where we can
871    if (cell.getTagsLength() == 0) {
872      return null;
873    }
874    List<Permission> results = Lists.newArrayList();
875    Iterator<Tag> tagsIterator = PrivateCellUtil.tagsIterator(cell);
876    while (tagsIterator.hasNext()) {
877      Tag tag = tagsIterator.next();
878      if (tag.getType() == ACL_TAG_TYPE) {
879        // Deserialize the table permissions from the KV
880        // TODO: This can be improved. Don't build UsersAndPermissions just to unpack it again,
881        // use the builder
882        AccessControlProtos.UsersAndPermissions.Builder builder =
883          AccessControlProtos.UsersAndPermissions.newBuilder();
884        if (tag.hasArray()) {
885          ProtobufUtil.mergeFrom(builder, tag.getValueArray(), tag.getValueOffset(),
886            tag.getValueLength());
887        } else {
888          ProtobufUtil.mergeFrom(builder, Tag.cloneValue(tag));
889        }
890        ListMultimap<String, Permission> kvPerms =
891          AccessControlUtil.toUsersAndPermissions(builder.build());
892        // Are there permissions for this user?
893        List<Permission> userPerms = kvPerms.get(user.getShortName());
894        if (userPerms != null) {
895          results.addAll(userPerms);
896        }
897        // Are there permissions for any of the groups this user belongs to?
898        String[] groupNames = user.getGroupNames();
899        if (groupNames != null) {
900          for (String group : groupNames) {
901            List<Permission> groupPerms = kvPerms.get(AuthUtil.toGroupEntry(group));
902            if (results != null) {
903              results.addAll(groupPerms);
904            }
905          }
906        }
907      }
908    }
909    return results;
910  }
911}