001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security.visibility; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.UUID; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.TimeoutException; 026import org.apache.hadoop.hbase.ArrayBackedTag; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.PrivateCellUtil; 029import org.apache.hadoop.hbase.Tag; 030import org.apache.hadoop.hbase.TagType; 031import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 032import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 033import org.apache.hadoop.hbase.replication.WALEntryFilter; 034import org.apache.hadoop.hbase.wal.WAL.Entry; 035import org.apache.hadoop.hbase.wal.WALEdit; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040@InterfaceAudience.Private 041public class VisibilityReplicationEndpoint implements ReplicationEndpoint { 042 043 private static final Logger LOG = LoggerFactory.getLogger(VisibilityReplicationEndpoint.class); 044 045 private final ReplicationEndpoint delegator; 046 private final VisibilityLabelService visibilityLabelsService; 047 048 public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint, 049 VisibilityLabelService visibilityLabelsService) { 050 this.delegator = endpoint; 051 this.visibilityLabelsService = visibilityLabelsService; 052 } 053 054 @Override 055 public void init(Context context) throws IOException { 056 delegator.init(context); 057 } 058 059 @Override 060 public void peerConfigUpdated(ReplicationPeerConfig rpc) { 061 delegator.peerConfigUpdated(rpc); 062 } 063 064 @Override 065 public boolean replicate(ReplicateContext replicateContext) { 066 if (!delegator.canReplicateToSameCluster()) { 067 // Only when the replication is inter cluster replication we need to 068 // convert the visibility tags to 069 // string based tags. But for intra cluster replication like region 070 // replicas it is not needed. 071 List<Entry> entries = replicateContext.getEntries(); 072 List<Tag> visTags = new ArrayList<>(); 073 List<Tag> nonVisTags = new ArrayList<>(); 074 List<Entry> newEntries = new ArrayList<>(entries.size()); 075 for (Entry entry : entries) { 076 WALEdit newEdit = new WALEdit(); 077 ArrayList<Cell> cells = entry.getEdit().getCells(); 078 for (Cell cell : cells) { 079 if (cell.getTagsLength() > 0) { 080 visTags.clear(); 081 nonVisTags.clear(); 082 Byte serializationFormat = 083 VisibilityUtils.extractAndPartitionTags(cell, visTags, nonVisTags); 084 if (!visTags.isEmpty()) { 085 try { 086 byte[] modifiedVisExpression = visibilityLabelsService 087 .encodeVisibilityForReplication(visTags, serializationFormat); 088 if (modifiedVisExpression != null) { 089 nonVisTags 090 .add(new ArrayBackedTag(TagType.STRING_VIS_TAG_TYPE, modifiedVisExpression)); 091 } 092 } catch (Exception ioe) { 093 LOG.error( 094 "Exception while reading the visibility labels from the cell. The replication " 095 + "would happen as per the existing format and not as " 096 + "string type for the cell " + cell + ".", 097 ioe); 098 // just return the old entries as it is without applying the string type change 099 newEdit.add(cell); 100 continue; 101 } 102 // Recreate the cell with the new tags and the existing tags 103 Cell newCell = PrivateCellUtil.createCell(cell, nonVisTags); 104 newEdit.add(newCell); 105 } else { 106 newEdit.add(cell); 107 } 108 } else { 109 newEdit.add(cell); 110 } 111 } 112 newEntries.add(new Entry((entry.getKey()), newEdit)); 113 } 114 replicateContext.setEntries(newEntries); 115 return delegator.replicate(replicateContext); 116 } else { 117 return delegator.replicate(replicateContext); 118 } 119 } 120 121 @Override 122 public synchronized UUID getPeerUUID() { 123 return delegator.getPeerUUID(); 124 } 125 126 @Override 127 public boolean canReplicateToSameCluster() { 128 return delegator.canReplicateToSameCluster(); 129 } 130 131 @Override 132 public WALEntryFilter getWALEntryfilter() { 133 return delegator.getWALEntryfilter(); 134 } 135 136 @Override 137 public boolean isRunning() { 138 return this.delegator.isRunning(); 139 } 140 141 @Override 142 public boolean isStarting() { 143 return this.delegator.isStarting(); 144 } 145 146 @Override 147 public void start() { 148 this.delegator.start(); 149 } 150 151 @Override 152 public void awaitRunning() { 153 this.delegator.awaitRunning(); 154 } 155 156 @Override 157 public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 158 this.delegator.awaitRunning(timeout, unit); 159 } 160 161 @Override 162 public void stop() { 163 this.delegator.stop(); 164 } 165 166 @Override 167 public void awaitTerminated() { 168 this.delegator.awaitTerminated(); 169 } 170 171 @Override 172 public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 173 this.delegator.awaitTerminated(timeout, unit); 174 } 175 176 @Override 177 public Throwable failureCause() { 178 return this.delegator.failureCause(); 179 } 180}