001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN;
021import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY;
022import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
023import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN;
024import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN;
025import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN;
026import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY;
027import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
028import static org.junit.Assert.assertEquals;
029import static org.junit.Assert.assertFalse;
030import static org.junit.Assert.assertTrue;
031
032import java.io.IOException;
033import java.util.ArrayList;
034import java.util.List;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HBaseTestingUtility;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.Waiter;
043import org.apache.hadoop.hbase.client.Admin;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.Result;
046import org.apache.hadoop.hbase.client.ResultScanner;
047import org.apache.hadoop.hbase.client.Scan;
048import org.apache.hadoop.hbase.client.Table;
049import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
050import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
051import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
052import org.apache.hadoop.hbase.testclassification.MediumTests;
053import org.apache.hadoop.hbase.testclassification.ReplicationTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.wal.WAL;
056import org.junit.AfterClass;
057import org.junit.BeforeClass;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064/**
065 * This test creates 2 mini hbase cluster. One cluster with
066 * "hbase.regionserver.replication.marker.enabled" conf key. This will create
067 * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore} which will create
068 * marker rows to be replicated to sink cluster. Second cluster with
069 * "hbase.regionserver.replication.sink.tracker.enabled" conf key enabled. This will persist the
070 * marker rows coming from peer cluster to persist to REPLICATION.SINK_TRACKER table.
071 **/
072@Category({ ReplicationTests.class, MediumTests.class })
073public class TestReplicationMarker {
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076    HBaseClassTestRule.forClass(TestReplicationMarker.class);
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationMarker.class);
079
080  private static Configuration conf1;
081  private static Configuration conf2;
082  private static HBaseTestingUtility utility1;
083  private static HBaseTestingUtility utility2;
084
085  @BeforeClass
086  public static void setUpBeforeClass() throws Exception {
087    conf1 = HBaseConfiguration.create();
088    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
089    conf2 = new Configuration(conf1);
090    // Run the replication marker chore in cluster1.
091    conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
092    conf1.setLong(REPLICATION_MARKER_CHORE_DURATION_KEY, 1000); // 1 sec
093    utility1 = new HBaseTestingUtility(conf1);
094
095    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
096    // Enable the replication sink tracker for cluster 2
097    conf2.setBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY, true);
098    utility2 = new HBaseTestingUtility(conf2);
099
100    // Start cluster 2 first so that hbase:replicationsinktracker table gets created first.
101    utility2.startMiniCluster(1);
102    waitForReplicationTrackerTableCreation();
103
104    // Start cluster1
105    utility1.startMiniCluster(1);
106    Admin admin1 = utility1.getAdmin();
107    ReplicationPeerConfigBuilder rpcBuilder = ReplicationPeerConfig.newBuilder();
108    rpcBuilder.setClusterKey(utility2.getClusterKey());
109    admin1.addReplicationPeer("1", rpcBuilder.build());
110
111    ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0)
112      .getReplicationSourceService().getReplicationManager();
113    // Wait until the peer gets established.
114    Waiter.waitFor(conf1, 10000, (Waiter.Predicate) () -> manager.getSources().size() == 1);
115  }
116
117  private static void waitForReplicationTrackerTableCreation() {
118    Waiter.waitFor(conf2, 10000, (Waiter.Predicate) () -> utility2.getAdmin()
119      .tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME));
120  }
121
122  @AfterClass
123  public static void tearDown() throws Exception {
124    utility1.shutdownMiniCluster();
125    utility2.shutdownMiniCluster();
126  }
127
128  @Test
129  public void testReplicationMarkerRow() throws Exception {
130    // We have configured ReplicationTrackerChore to run every second. Sleeping so that it will
131    // create enough sentinel rows.
132    Thread.sleep(5000);
133    WAL wal1 = utility1.getHBaseCluster().getRegionServer(0).getWAL(null);
134    String walName1ForCluster1 = ((AbstractFSWAL) wal1).getCurrentFileName().getName();
135    String rs1Name = utility1.getHBaseCluster().getRegionServer(0).getServerName().getHostname();
136    // Since we sync the marker edits while appending to wal, all the edits should be visible
137    // to Replication threads immediately.
138    assertTrue(getReplicatedEntries() >= 5);
139    // Force log roll.
140    wal1.rollWriter(true);
141    String walName2ForCluster1 = ((AbstractFSWAL) wal1).getCurrentFileName().getName();
142    Connection connection2 = utility2.getMiniHBaseCluster().getRegionServer(0).getConnection();
143    // Sleep for 5 more seconds to get marker rows with new wal name.
144    Thread.sleep(5000);
145    // Wait for cluster 2 to have atleast 8 tracker rows from cluster1.
146    utility2.waitFor(5000, () -> getTableCount(connection2) >= 8);
147    // Get replication marker rows from cluster2
148    List<ReplicationSinkTrackerRow> list = getRows(connection2);
149    for (ReplicationSinkTrackerRow desc : list) {
150      // All the tracker rows should have same region server name i.e. rs of cluster1
151      assertEquals(rs1Name, desc.getRegionServerName());
152      // All the tracker rows will have either wal1 or wal2 name.
153      assertTrue(walName1ForCluster1.equals(desc.getWalName())
154        || walName2ForCluster1.equals(desc.getWalName()));
155    }
156
157    // This table shouldn't exist on cluster1 since
158    // hbase.regionserver.replication.sink.tracker.enabled is not enabled on this cluster.
159    assertFalse(utility1.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME));
160    // This table shouldn't exist on cluster1 since
161    // hbase.regionserver.replication.sink.tracker.enabled is enabled on this cluster.
162    assertTrue(utility2.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME));
163  }
164
165  /*
166   * Get rows for replication sink tracker table.
167   */
168  private List<ReplicationSinkTrackerRow> getRows(Connection connection) throws IOException {
169    List<ReplicationSinkTrackerRow> list = new ArrayList<>();
170    Scan scan = new Scan();
171    Table table = connection.getTable(REPLICATION_SINK_TRACKER_TABLE_NAME);
172    ResultScanner scanner = table.getScanner(scan);
173
174    Result r;
175    while ((r = scanner.next()) != null) {
176      List<Cell> cells = r.listCells();
177      list.add(getPayload(cells));
178    }
179    return list;
180  }
181
182  private ReplicationSinkTrackerRow getPayload(List<Cell> cells) {
183    String rsName = null, walName = null;
184    Long offset = null;
185    long timestamp = 0L;
186    for (Cell cell : cells) {
187      byte[] qualifier = CellUtil.cloneQualifier(cell);
188      byte[] value = CellUtil.cloneValue(cell);
189
190      if (Bytes.equals(RS_COLUMN, qualifier)) {
191        rsName = Bytes.toString(value);
192      } else if (Bytes.equals(WAL_NAME_COLUMN, qualifier)) {
193        walName = Bytes.toString(value);
194      } else if (Bytes.equals(TIMESTAMP_COLUMN, qualifier)) {
195        timestamp = Bytes.toLong(value);
196      } else if (Bytes.equals(OFFSET_COLUMN, qualifier)) {
197        offset = Bytes.toLong(value);
198      }
199    }
200    ReplicationSinkTrackerRow row =
201      new ReplicationSinkTrackerRow(rsName, walName, timestamp, offset);
202    return row;
203  }
204
205  static class ReplicationSinkTrackerRow {
206    private String region_server_name;
207    private String wal_name;
208    private long timestamp;
209    private long offset;
210
211    public ReplicationSinkTrackerRow(String region_server_name, String wal_name, long timestamp,
212      long offset) {
213      this.region_server_name = region_server_name;
214      this.wal_name = wal_name;
215      this.timestamp = timestamp;
216      this.offset = offset;
217    }
218
219    public String getRegionServerName() {
220      return region_server_name;
221    }
222
223    public String getWalName() {
224      return wal_name;
225    }
226
227    public long getTimestamp() {
228      return timestamp;
229    }
230
231    public long getOffset() {
232      return offset;
233    }
234
235    @Override
236    public String toString() {
237      return "ReplicationSinkTrackerRow{" + "region_server_name='" + region_server_name + '\''
238        + ", wal_name='" + wal_name + '\'' + ", timestamp=" + timestamp + ", offset=" + offset
239        + '}';
240    }
241  }
242
243  private int getTableCount(Connection connection) throws Exception {
244    Table table = connection.getTable(REPLICATION_SINK_TRACKER_TABLE_NAME);
245    ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM));
246    int count = 0;
247    while (resultScanner.next() != null) {
248      count++;
249    }
250    LOG.info("Table count: " + count);
251    return count;
252  }
253
254  /*
255   * Return replicated entries from cluster1.
256   */
257  private long getReplicatedEntries() {
258    ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0)
259      .getReplicationSourceService().getReplicationManager();
260    List<ReplicationSourceInterface> sources = manager.getSources();
261    assertEquals(1, sources.size());
262    ReplicationSource source = (ReplicationSource) sources.get(0);
263    return source.getTotalReplicatedEdits();
264  }
265}