001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.concurrent.ThreadLocalRandom;
025import org.apache.hadoop.hbase.ScheduledChore;
026import org.apache.hadoop.hbase.Stoppable;
027import org.apache.hadoop.hbase.client.RegionInfo;
028import org.apache.hadoop.hbase.client.RegionInfoBuilder;
029import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
030import org.apache.hadoop.hbase.regionserver.Region;
031import org.apache.hadoop.hbase.regionserver.RegionServerServices;
032import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.apache.hadoop.hbase.wal.WAL;
036import org.apache.hadoop.hbase.wal.WALEdit;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * This chore is responsible to create replication marker rows with special WALEdit with family as
043 * {@link org.apache.hadoop.hbase.wal.WALEdit#METAFAMILY} and column qualifier as
044 * {@link WALEdit#REPLICATION_MARKER} and empty value. If config key
045 * {@link #REPLICATION_MARKER_ENABLED_KEY} is set to true, then we will create 1 marker row every
046 * {@link #REPLICATION_MARKER_CHORE_DURATION_KEY} ms
047 * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader} will populate
048 * the Replication Marker edit with region_server_name, wal_name and wal_offset encoded in
049 * {@link org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ReplicationMarkerDescriptor}
050 * object. {@link org.apache.hadoop.hbase.replication.regionserver.Replication} will change the
051 * REPLICATION_SCOPE for this edit to GLOBAL so that it can replicate. On the sink cluster,
052 * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSink} will convert the
053 * ReplicationMarkerDescriptor into a Put mutation to REPLICATION_SINK_TRACKER_TABLE_NAME_STR table.
054 */
055@InterfaceAudience.Private
056public class ReplicationMarkerChore extends ScheduledChore {
057  private static final Logger LOG = LoggerFactory.getLogger(ReplicationMarkerChore.class);
058  private static final MultiVersionConcurrencyControl MVCC = new MultiVersionConcurrencyControl();
059  public static final RegionInfo REGION_INFO =
060    RegionInfoBuilder.newBuilder(REPLICATION_SINK_TRACKER_TABLE_NAME).build();
061  private static final String DELIMITER = "_";
062  private final RegionServerServices rsServices;
063  private WAL wal;
064
065  public static final String REPLICATION_MARKER_ENABLED_KEY =
066    "hbase.regionserver.replication.marker.enabled";
067  public static final boolean REPLICATION_MARKER_ENABLED_DEFAULT = false;
068
069  public static final String REPLICATION_MARKER_CHORE_DURATION_KEY =
070    "hbase.regionserver.replication.marker.chore.duration";
071  public static final int REPLICATION_MARKER_CHORE_DURATION_DEFAULT = 30 * 1000; // 30 seconds
072
073  public ReplicationMarkerChore(final Stoppable stopper, final RegionServerServices rsServices,
074    int period) {
075    super("ReplicationTrackerChore", stopper, period);
076    this.rsServices = rsServices;
077  }
078
079  @Override
080  protected void chore() {
081    if (wal == null) {
082      try {
083        // TODO: We need to add support for multi WAL implementation.
084        wal = rsServices.getWAL(null);
085      } catch (IOException ioe) {
086        LOG.warn("Unable to get WAL ", ioe);
087        // Shouldn't happen. Ignore and wait for the next chore run.
088        return;
089      }
090    }
091    String serverName = rsServices.getServerName().getServerName();
092    long timeStamp = EnvironmentEdgeManager.currentTime();
093    // We only have timestamp in ReplicationMarkerDescriptor and the remaining properties walname,
094    // regionserver name and wal offset at ReplicationSourceWALReaderThread.
095    byte[] rowKey = getRowKey(serverName, timeStamp);
096    if (LOG.isTraceEnabled()) {
097      LOG.trace("Creating replication marker edit.");
098    }
099
100    // This creates a new ArrayList of all the online regions for every call.
101    List<? extends Region> regions = rsServices.getRegions();
102
103    if (regions.isEmpty()) {
104      LOG.info("There are no online regions for this server, so skipping adding replication marker"
105        + " rows for this regionserver");
106      return;
107    }
108    Region region = regions.get(ThreadLocalRandom.current().nextInt(regions.size()));
109    try {
110      WALUtil.writeReplicationMarkerAndSync(wal, MVCC, region.getRegionInfo(), rowKey, timeStamp);
111    } catch (IOException ioe) {
112      LOG.error("Exception while sync'ing replication tracker edit", ioe);
113      // TODO: Should we stop region server or add a metric and keep going.
114    }
115  }
116
117  /**
118   * Creates a rowkey with region server name and timestamp.
119   * @param serverName region server name
120   * @param timestamp  timestamp
121   */
122  public static byte[] getRowKey(String serverName, long timestamp) {
123    // converting to string since this will help seeing the timestamp in string format using
124    // hbase shell commands.
125    String timestampStr = String.valueOf(timestamp);
126    final String rowKeyStr = serverName + DELIMITER + timestampStr;
127    return Bytes.toBytes(rowKeyStr);
128  }
129}