001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN; 021import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY; 022import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; 023import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN; 024import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN; 025import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN; 026import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY; 027import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; 028import static org.junit.Assert.assertEquals; 029import static org.junit.Assert.assertFalse; 030import static org.junit.Assert.assertTrue; 031 032import java.io.IOException; 033import java.util.ArrayList; 034import java.util.List; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.Waiter; 043import org.apache.hadoop.hbase.client.Admin; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.Result; 046import org.apache.hadoop.hbase.client.ResultScanner; 047import org.apache.hadoop.hbase.client.Scan; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 050import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 051import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 052import org.apache.hadoop.hbase.testclassification.MediumTests; 053import org.apache.hadoop.hbase.testclassification.ReplicationTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.wal.WAL; 056import org.junit.AfterClass; 057import org.junit.BeforeClass; 058import org.junit.ClassRule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064/** 065 * This test creates 2 mini hbase cluster. One cluster with 066 * "hbase.regionserver.replication.marker.enabled" conf key. This will create 067 * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore} which will create 068 * marker rows to be replicated to sink cluster. Second cluster with 069 * "hbase.regionserver.replication.sink.tracker.enabled" conf key enabled. This will persist the 070 * marker rows coming from peer cluster to persist to REPLICATION.SINK_TRACKER table. 071 **/ 072@Category({ ReplicationTests.class, MediumTests.class }) 073public class TestReplicationMarker { 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestReplicationMarker.class); 077 078 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationMarker.class); 079 080 private static Configuration conf1; 081 private static Configuration conf2; 082 private static HBaseTestingUtil utility1; 083 private static HBaseTestingUtil utility2; 084 085 @BeforeClass 086 public static void setUpBeforeClass() throws Exception { 087 conf1 = HBaseConfiguration.create(); 088 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); 089 conf2 = new Configuration(conf1); 090 // Run the replication marker chore in cluster1. 091 conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); 092 conf1.setLong(REPLICATION_MARKER_CHORE_DURATION_KEY, 1000); // 1 sec 093 utility1 = new HBaseTestingUtil(conf1); 094 095 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 096 // Enable the replication sink tracker for cluster 2 097 conf2.setBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY, true); 098 utility2 = new HBaseTestingUtil(conf2); 099 100 // Start cluster 2 first so that hbase:replicationsinktracker table gets created first. 101 utility2.startMiniCluster(1); 102 waitForReplicationTrackerTableCreation(); 103 104 // Start cluster1 105 utility1.startMiniCluster(1); 106 Admin admin1 = utility1.getAdmin(); 107 ReplicationPeerConfigBuilder rpcBuilder = ReplicationPeerConfig.newBuilder(); 108 rpcBuilder.setClusterKey(utility2.getRpcConnnectionURI()); 109 admin1.addReplicationPeer("1", rpcBuilder.build()); 110 111 ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0) 112 .getReplicationSourceService().getReplicationManager(); 113 // Wait until the peer gets established. 114 Waiter.waitFor(conf1, 10000, (Waiter.Predicate) () -> manager.getSources().size() == 1); 115 } 116 117 private static void waitForReplicationTrackerTableCreation() { 118 Waiter.waitFor(conf2, 10000, (Waiter.Predicate) () -> utility2.getAdmin() 119 .tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME)); 120 } 121 122 @AfterClass 123 public static void tearDown() throws Exception { 124 utility1.shutdownMiniCluster(); 125 utility2.shutdownMiniCluster(); 126 } 127 128 @Test 129 public void testReplicationMarkerRow() throws Exception { 130 // We have configured ReplicationTrackerChore to run every second. Sleeping so that it will 131 // create enough sentinel rows. 132 Thread.sleep(5000); 133 WAL wal1 = utility1.getHBaseCluster().getRegionServer(0).getWAL(null); 134 String walName1ForCluster1 = ((AbstractFSWAL) wal1).getCurrentFileName().getName(); 135 String rs1Name = utility1.getHBaseCluster().getRegionServer(0).getServerName().getHostname(); 136 // Since we sync the marker edits while appending to wal, all the edits should be visible 137 // to Replication threads immediately. 138 assertTrue(getReplicatedEntries() >= 5); 139 // Force log roll. 140 wal1.rollWriter(true); 141 String walName2ForCluster1 = ((AbstractFSWAL) wal1).getCurrentFileName().getName(); 142 Connection connection2 = utility2.getMiniHBaseCluster().getRegionServer(0).getConnection(); 143 // Sleep for 5 more seconds to get marker rows with new wal name. 144 Thread.sleep(5000); 145 // Wait for cluster 2 to have atleast 8 tracker rows from cluster1. 146 utility2.waitFor(5000, () -> getTableCount(connection2) >= 8); 147 // Get replication marker rows from cluster2 148 List<ReplicationSinkTrackerRow> list = getRows(connection2); 149 for (ReplicationSinkTrackerRow desc : list) { 150 // All the tracker rows should have same region server name i.e. rs of cluster1 151 assertEquals(rs1Name, desc.getRegionServerName()); 152 // All the tracker rows will have either wal1 or wal2 name. 153 assertTrue(walName1ForCluster1.equals(desc.getWalName()) 154 || walName2ForCluster1.equals(desc.getWalName())); 155 } 156 157 // This table shouldn't exist on cluster1 since 158 // hbase.regionserver.replication.sink.tracker.enabled is not enabled on this cluster. 159 assertFalse(utility1.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME)); 160 // This table shouldn't exist on cluster1 since 161 // hbase.regionserver.replication.sink.tracker.enabled is enabled on this cluster. 162 assertTrue(utility2.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME)); 163 } 164 165 /* 166 * Get rows for replication sink tracker table. 167 */ 168 private List<ReplicationSinkTrackerRow> getRows(Connection connection) throws IOException { 169 List<ReplicationSinkTrackerRow> list = new ArrayList<>(); 170 Scan scan = new Scan(); 171 Table table = connection.getTable(REPLICATION_SINK_TRACKER_TABLE_NAME); 172 ResultScanner scanner = table.getScanner(scan); 173 174 Result r; 175 while ((r = scanner.next()) != null) { 176 List<Cell> cells = r.listCells(); 177 list.add(getPayload(cells)); 178 } 179 return list; 180 } 181 182 private ReplicationSinkTrackerRow getPayload(List<Cell> cells) { 183 String rsName = null, walName = null; 184 Long offset = null; 185 long timestamp = 0L; 186 for (Cell cell : cells) { 187 byte[] qualifier = CellUtil.cloneQualifier(cell); 188 byte[] value = CellUtil.cloneValue(cell); 189 190 if (Bytes.equals(RS_COLUMN, qualifier)) { 191 rsName = Bytes.toString(value); 192 } else if (Bytes.equals(WAL_NAME_COLUMN, qualifier)) { 193 walName = Bytes.toString(value); 194 } else if (Bytes.equals(TIMESTAMP_COLUMN, qualifier)) { 195 timestamp = Bytes.toLong(value); 196 } else if (Bytes.equals(OFFSET_COLUMN, qualifier)) { 197 offset = Bytes.toLong(value); 198 } 199 } 200 ReplicationSinkTrackerRow row = 201 new ReplicationSinkTrackerRow(rsName, walName, timestamp, offset); 202 return row; 203 } 204 205 static class ReplicationSinkTrackerRow { 206 private String region_server_name; 207 private String wal_name; 208 private long timestamp; 209 private long offset; 210 211 public ReplicationSinkTrackerRow(String region_server_name, String wal_name, long timestamp, 212 long offset) { 213 this.region_server_name = region_server_name; 214 this.wal_name = wal_name; 215 this.timestamp = timestamp; 216 this.offset = offset; 217 } 218 219 public String getRegionServerName() { 220 return region_server_name; 221 } 222 223 public String getWalName() { 224 return wal_name; 225 } 226 227 public long getTimestamp() { 228 return timestamp; 229 } 230 231 public long getOffset() { 232 return offset; 233 } 234 235 @Override 236 public String toString() { 237 return "ReplicationSinkTrackerRow{" + "region_server_name='" + region_server_name + '\'' 238 + ", wal_name='" + wal_name + '\'' + ", timestamp=" + timestamp + ", offset=" + offset 239 + '}'; 240 } 241 } 242 243 private int getTableCount(Connection connection) throws Exception { 244 Table table = connection.getTable(REPLICATION_SINK_TRACKER_TABLE_NAME); 245 ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM)); 246 int count = 0; 247 while (resultScanner.next() != null) { 248 count++; 249 } 250 LOG.info("Table count: " + count); 251 return count; 252 } 253 254 /* 255 * Return replicated entries from cluster1. 256 */ 257 private long getReplicatedEntries() { 258 ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0) 259 .getReplicationSourceService().getReplicationManager(); 260 List<ReplicationSourceInterface> sources = manager.getSources(); 261 assertEquals(1, sources.size()); 262 ReplicationSource source = (ReplicationSource) sources.get(0); 263 return source.getTotalReplicatedEdits(); 264 } 265}