001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024import java.util.Collections; 025import java.util.List; 026import java.util.Optional; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Admin; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.ConnectionFactory; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.client.Mutation; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.client.TableDescriptor; 043import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 044import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; 045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 046import org.apache.hadoop.hbase.coprocessor.ObserverContext; 047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 049import org.apache.hadoop.hbase.coprocessor.RegionObserver; 050import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; 051import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; 052import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; 053import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 054import org.apache.hadoop.hbase.testclassification.MediumTests; 055import org.apache.hadoop.hbase.testclassification.ReplicationTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.wal.WALEdit; 058import org.apache.hadoop.hbase.wal.WALKey; 059import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 060import org.junit.AfterClass; 061import org.junit.BeforeClass; 062import org.junit.ClassRule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 069 070import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 072 073@Category({ ReplicationTests.class, MediumTests.class }) 074public class TestReplicationWithWALExtendedAttributes { 075 076 @ClassRule 077 public static final HBaseClassTestRule CLASS_RULE = 078 HBaseClassTestRule.forClass(TestReplicationWithWALExtendedAttributes.class); 079 080 private static final Logger LOG = 081 LoggerFactory.getLogger(TestReplicationWithWALExtendedAttributes.class); 082 083 private static Configuration conf1 = HBaseConfiguration.create(); 084 085 private static Admin replicationAdmin; 086 087 private static Connection connection1; 088 089 private static Table htable1; 090 private static Table htable2; 091 092 private static HBaseTestingUtil utility1; 093 private static HBaseTestingUtil utility2; 094 private static final long SLEEP_TIME = 500; 095 private static final int NB_RETRIES = 10; 096 097 private static final TableName TABLE_NAME = TableName.valueOf("TestReplicationWithWALAnnotation"); 098 private static final byte[] FAMILY = Bytes.toBytes("f"); 099 private static final byte[] ROW = Bytes.toBytes("row"); 100 private static final byte[] ROW2 = Bytes.toBytes("row2"); 101 102 @BeforeClass 103 public static void setUpBeforeClass() throws Exception { 104 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); 105 conf1.setInt("replication.source.size.capacity", 10240); 106 conf1.setLong("replication.source.sleepforretries", 100); 107 conf1.setInt("hbase.regionserver.maxlogs", 10); 108 conf1.setLong("hbase.master.logcleaner.ttl", 10); 109 conf1.setInt("zookeeper.recovery.retry", 1); 110 conf1.setInt("zookeeper.recovery.retry.intervalmill", 10); 111 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 112 conf1.setInt("replication.stats.thread.period.seconds", 5); 113 conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); 114 conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); 115 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 116 TestCoprocessorForWALAnnotationAtSource.class.getName()); 117 118 utility1 = new HBaseTestingUtil(conf1); 119 utility1.startMiniZKCluster(); 120 MiniZooKeeperCluster miniZK = utility1.getZkCluster(); 121 // Have to reget conf1 in case zk cluster location different 122 // than default 123 conf1 = utility1.getConfiguration(); 124 LOG.info("Setup first Zk"); 125 126 // Base conf2 on conf1 so it gets the right zk cluster. 127 Configuration conf2 = HBaseConfiguration.create(conf1); 128 conf2.setInt("hfile.format.version", 3); 129 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 130 conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); 131 conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); 132 conf2.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); 133 conf2.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 134 TestCoprocessorForWALAnnotationAtSink.class.getName()); 135 conf2.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, 136 TestReplicationSinkRegionServerEndpoint.class.getName()); 137 138 utility2 = new HBaseTestingUtil(conf2); 139 utility2.setZkCluster(miniZK); 140 141 LOG.info("Setup second Zk"); 142 utility1.startMiniCluster(2); 143 utility2.startMiniCluster(2); 144 145 connection1 = ConnectionFactory.createConnection(conf1); 146 replicationAdmin = connection1.getAdmin(); 147 ReplicationPeerConfig rpc = 148 ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getRpcConnnectionURI()).build(); 149 replicationAdmin.addReplicationPeer("2", rpc); 150 151 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAME) 152 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3) 153 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 154 .build(); 155 try (Connection conn = ConnectionFactory.createConnection(conf1); 156 Admin admin = conn.getAdmin()) { 157 admin.createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 158 } 159 try (Connection conn = ConnectionFactory.createConnection(conf2); 160 Admin admin = conn.getAdmin()) { 161 admin.createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 162 } 163 htable1 = utility1.getConnection().getTable(TABLE_NAME); 164 htable2 = utility2.getConnection().getTable(TABLE_NAME); 165 } 166 167 @AfterClass 168 public static void tearDownAfterClass() throws Exception { 169 Closeables.close(replicationAdmin, true); 170 Closeables.close(connection1, true); 171 utility2.shutdownMiniCluster(); 172 utility1.shutdownMiniCluster(); 173 } 174 175 @Test 176 public void testReplicationWithWALExtendedAttributes() throws Exception { 177 Put put = new Put(ROW); 178 put.addColumn(FAMILY, ROW, ROW); 179 180 htable1 = utility1.getConnection().getTable(TABLE_NAME); 181 htable1.put(put); 182 183 Put put2 = new Put(ROW2); 184 put2.addColumn(FAMILY, ROW2, ROW2); 185 186 htable1.batch(Collections.singletonList(put2), new Object[1]); 187 188 assertGetValues(new Get(ROW), ROW); 189 assertGetValues(new Get(ROW2), ROW2); 190 } 191 192 private static void assertGetValues(Get get, byte[] value) 193 throws IOException, InterruptedException { 194 for (int i = 0; i < NB_RETRIES; i++) { 195 if (i == NB_RETRIES - 1) { 196 fail("Waited too much time for put replication"); 197 } 198 Result res = htable2.get(get); 199 if (res.isEmpty()) { 200 LOG.info("Row not available"); 201 Thread.sleep(SLEEP_TIME); 202 } else { 203 assertArrayEquals(value, res.value()); 204 break; 205 } 206 } 207 } 208 209 public static class TestCoprocessorForWALAnnotationAtSource 210 implements RegionCoprocessor, RegionObserver { 211 212 @Override 213 public Optional<RegionObserver> getRegionObserver() { 214 return Optional.of(this); 215 } 216 217 @Override 218 public void preWALAppend(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, 219 WALKey key, WALEdit edit) throws IOException { 220 key.addExtendedAttribute("extendedAttr1", Bytes.toBytes("Value of Extended attribute 01")); 221 key.addExtendedAttribute("extendedAttr2", Bytes.toBytes("Value of Extended attribute 02")); 222 } 223 } 224 225 public static class TestCoprocessorForWALAnnotationAtSink 226 implements RegionCoprocessor, RegionObserver { 227 228 @Override 229 public Optional<RegionObserver> getRegionObserver() { 230 return Optional.of(this); 231 } 232 233 @Override 234 public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> c, Put put, 235 WALEdit edit) throws IOException { 236 String attrVal1 = Bytes.toString(put.getAttribute("extendedAttr1")); 237 String attrVal2 = Bytes.toString(put.getAttribute("extendedAttr2")); 238 if (attrVal1 == null || attrVal2 == null) { 239 throw new IOException("Failed to retrieve WAL annotations"); 240 } 241 if ( 242 attrVal1.equals("Value of Extended attribute 01") 243 && attrVal2.equals("Value of Extended attribute 02") 244 ) { 245 return; 246 } 247 throw new IOException("Failed to retrieve WAL annotations.."); 248 } 249 250 @Override 251 public void preBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c, 252 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 253 String attrVal1 = Bytes.toString(miniBatchOp.getOperation(0).getAttribute("extendedAttr1")); 254 String attrVal2 = Bytes.toString(miniBatchOp.getOperation(0).getAttribute("extendedAttr2")); 255 if (attrVal1 == null || attrVal2 == null) { 256 throw new IOException("Failed to retrieve WAL annotations"); 257 } 258 if ( 259 attrVal1.equals("Value of Extended attribute 01") 260 && attrVal2.equals("Value of Extended attribute 02") 261 ) { 262 return; 263 } 264 throw new IOException("Failed to retrieve WAL annotations.."); 265 } 266 } 267 268 public static final class TestReplicationSinkRegionServerEndpoint 269 implements RegionServerCoprocessor, RegionServerObserver { 270 271 @Override 272 public Optional<RegionServerObserver> getRegionServerObserver() { 273 return Optional.of(this); 274 } 275 276 @Override 277 public void preReplicationSinkBatchMutate( 278 ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry, 279 Mutation mutation) throws IOException { 280 RegionServerObserver.super.preReplicationSinkBatchMutate(ctx, walEntry, mutation); 281 List<WALProtos.Attribute> attributeList = walEntry.getKey().getExtendedAttributesList(); 282 attachWALExtendedAttributesToMutation(mutation, attributeList); 283 } 284 285 @Override 286 public void postReplicationSinkBatchMutate( 287 ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry, 288 Mutation mutation) throws IOException { 289 RegionServerObserver.super.postReplicationSinkBatchMutate(ctx, walEntry, mutation); 290 LOG.info("WALEntry extended attributes: {}", walEntry.getKey().getExtendedAttributesList()); 291 LOG.info("Mutation attributes: {}", mutation.getAttributesMap()); 292 } 293 294 private void attachWALExtendedAttributesToMutation(Mutation mutation, 295 List<WALProtos.Attribute> attributeList) { 296 if (attributeList != null) { 297 for (WALProtos.Attribute attribute : attributeList) { 298 mutation.setAttribute(attribute.getKey(), attribute.getValue().toByteArray()); 299 } 300 } 301 } 302 } 303 304}