001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.visibility;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.UUID;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.TimeoutException;
026import org.apache.hadoop.hbase.ArrayBackedTag;
027import org.apache.hadoop.hbase.ExtendedCell;
028import org.apache.hadoop.hbase.PrivateCellUtil;
029import org.apache.hadoop.hbase.Tag;
030import org.apache.hadoop.hbase.TagType;
031import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
032import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
033import org.apache.hadoop.hbase.replication.WALEntryFilter;
034import org.apache.hadoop.hbase.wal.WAL.Entry;
035import org.apache.hadoop.hbase.wal.WALEdit;
036import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041@InterfaceAudience.Private
042public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
043
044  private static final Logger LOG = LoggerFactory.getLogger(VisibilityReplicationEndpoint.class);
045
046  private final ReplicationEndpoint delegator;
047  private final VisibilityLabelService visibilityLabelsService;
048
049  public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint,
050    VisibilityLabelService visibilityLabelsService) {
051    this.delegator = endpoint;
052    this.visibilityLabelsService = visibilityLabelsService;
053  }
054
055  @Override
056  public void init(Context context) throws IOException {
057    delegator.init(context);
058  }
059
060  @Override
061  public void peerConfigUpdated(ReplicationPeerConfig rpc) {
062    delegator.peerConfigUpdated(rpc);
063  }
064
065  @Override
066  public boolean replicate(ReplicateContext replicateContext) {
067    if (!delegator.canReplicateToSameCluster()) {
068      // Only when the replication is inter cluster replication we need to
069      // convert the visibility tags to
070      // string based tags. But for intra cluster replication like region
071      // replicas it is not needed.
072      List<Entry> entries = replicateContext.getEntries();
073      List<Tag> visTags = new ArrayList<>();
074      List<Tag> nonVisTags = new ArrayList<>();
075      List<Entry> newEntries = new ArrayList<>(entries.size());
076      for (Entry entry : entries) {
077        WALEdit newEdit = new WALEdit();
078        List<ExtendedCell> cells = WALEditInternalHelper.getExtendedCells(entry.getEdit());
079        for (ExtendedCell cell : cells) {
080          if (cell.getTagsLength() > 0) {
081            visTags.clear();
082            nonVisTags.clear();
083            Byte serializationFormat =
084              VisibilityUtils.extractAndPartitionTags(cell, visTags, nonVisTags);
085            if (!visTags.isEmpty()) {
086              try {
087                byte[] modifiedVisExpression = visibilityLabelsService
088                  .encodeVisibilityForReplication(visTags, serializationFormat);
089                if (modifiedVisExpression != null) {
090                  nonVisTags
091                    .add(new ArrayBackedTag(TagType.STRING_VIS_TAG_TYPE, modifiedVisExpression));
092                }
093              } catch (Exception ioe) {
094                LOG.error(
095                  "Exception while reading the visibility labels from the cell. The replication "
096                    + "would happen as per the existing format and not as "
097                    + "string type for the cell " + cell + ".",
098                  ioe);
099                // just return the old entries as it is without applying the string type change
100                WALEditInternalHelper.addExtendedCell(newEdit, cell);
101                continue;
102              }
103              // Recreate the cell with the new tags and the existing tags
104              ExtendedCell newCell = PrivateCellUtil.createCell(cell, nonVisTags);
105              WALEditInternalHelper.addExtendedCell(newEdit, newCell);
106            } else {
107              WALEditInternalHelper.addExtendedCell(newEdit, cell);
108            }
109          } else {
110            WALEditInternalHelper.addExtendedCell(newEdit, cell);
111          }
112        }
113        newEntries.add(new Entry((entry.getKey()), newEdit));
114      }
115      replicateContext.setEntries(newEntries);
116      return delegator.replicate(replicateContext);
117    } else {
118      return delegator.replicate(replicateContext);
119    }
120  }
121
122  @Override
123  public synchronized UUID getPeerUUID() {
124    return delegator.getPeerUUID();
125  }
126
127  @Override
128  public boolean canReplicateToSameCluster() {
129    return delegator.canReplicateToSameCluster();
130  }
131
132  @Override
133  public WALEntryFilter getWALEntryfilter() {
134    return delegator.getWALEntryfilter();
135  }
136
137  @Override
138  public boolean isRunning() {
139    return this.delegator.isRunning();
140  }
141
142  @Override
143  public boolean isStarting() {
144    return this.delegator.isStarting();
145  }
146
147  @Override
148  public void start() {
149    this.delegator.start();
150  }
151
152  @Override
153  public void awaitRunning() {
154    this.delegator.awaitRunning();
155  }
156
157  @Override
158  public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
159    this.delegator.awaitRunning(timeout, unit);
160  }
161
162  @Override
163  public void stop() {
164    this.delegator.stop();
165  }
166
167  @Override
168  public void awaitTerminated() {
169    this.delegator.awaitTerminated();
170  }
171
172  @Override
173  public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
174    this.delegator.awaitTerminated(timeout, unit);
175  }
176
177  @Override
178  public Throwable failureCause() {
179    return this.delegator.failureCause();
180  }
181}