001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.concurrent.ThreadLocalRandom; 025import org.apache.hadoop.hbase.ScheduledChore; 026import org.apache.hadoop.hbase.Stoppable; 027import org.apache.hadoop.hbase.client.RegionInfo; 028import org.apache.hadoop.hbase.client.RegionInfoBuilder; 029import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 030import org.apache.hadoop.hbase.regionserver.Region; 031import org.apache.hadoop.hbase.regionserver.RegionServerServices; 032import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.apache.hadoop.hbase.wal.WAL; 036import org.apache.hadoop.hbase.wal.WALEdit; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * This chore is responsible to create replication marker rows with special WALEdit with family as 043 * {@link org.apache.hadoop.hbase.wal.WALEdit#METAFAMILY} and column qualifier as 044 * {@link WALEdit#REPLICATION_MARKER} and empty value. If config key 045 * {@link #REPLICATION_MARKER_ENABLED_KEY} is set to true, then we will create 1 marker row every 046 * {@link #REPLICATION_MARKER_CHORE_DURATION_KEY} ms 047 * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader} will populate 048 * the Replication Marker edit with region_server_name, wal_name and wal_offset encoded in 049 * {@link org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ReplicationMarkerDescriptor} 050 * object. {@link org.apache.hadoop.hbase.replication.regionserver.Replication} will change the 051 * REPLICATION_SCOPE for this edit to GLOBAL so that it can replicate. On the sink cluster, 052 * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSink} will convert the 053 * ReplicationMarkerDescriptor into a Put mutation to REPLICATION_SINK_TRACKER_TABLE_NAME_STR table. 054 */ 055@InterfaceAudience.Private 056public class ReplicationMarkerChore extends ScheduledChore { 057 private static final Logger LOG = LoggerFactory.getLogger(ReplicationMarkerChore.class); 058 private static final MultiVersionConcurrencyControl MVCC = new MultiVersionConcurrencyControl(); 059 public static final RegionInfo REGION_INFO = 060 RegionInfoBuilder.newBuilder(REPLICATION_SINK_TRACKER_TABLE_NAME).build(); 061 private static final String DELIMITER = "_"; 062 private final RegionServerServices rsServices; 063 private WAL wal; 064 065 public static final String REPLICATION_MARKER_ENABLED_KEY = 066 "hbase.regionserver.replication.marker.enabled"; 067 public static final boolean REPLICATION_MARKER_ENABLED_DEFAULT = false; 068 069 public static final String REPLICATION_MARKER_CHORE_DURATION_KEY = 070 "hbase.regionserver.replication.marker.chore.duration"; 071 public static final int REPLICATION_MARKER_CHORE_DURATION_DEFAULT = 30 * 1000; // 30 seconds 072 073 public ReplicationMarkerChore(final Stoppable stopper, final RegionServerServices rsServices, 074 int period) { 075 super("ReplicationTrackerChore", stopper, period); 076 this.rsServices = rsServices; 077 } 078 079 @Override 080 protected void chore() { 081 if (wal == null) { 082 try { 083 // TODO: We need to add support for multi WAL implementation. 084 wal = rsServices.getWAL(null); 085 } catch (IOException ioe) { 086 LOG.warn("Unable to get WAL ", ioe); 087 // Shouldn't happen. Ignore and wait for the next chore run. 088 return; 089 } 090 } 091 String serverName = rsServices.getServerName().getServerName(); 092 long timeStamp = EnvironmentEdgeManager.currentTime(); 093 // We only have timestamp in ReplicationMarkerDescriptor and the remaining properties walname, 094 // regionserver name and wal offset at ReplicationSourceWALReaderThread. 095 byte[] rowKey = getRowKey(serverName, timeStamp); 096 if (LOG.isTraceEnabled()) { 097 LOG.trace("Creating replication marker edit."); 098 } 099 100 // This creates a new ArrayList of all the online regions for every call. 101 List<? extends Region> regions = rsServices.getRegions(); 102 103 if (regions.isEmpty()) { 104 LOG.info("There are no online regions for this server, so skipping adding replication marker" 105 + " rows for this regionserver"); 106 return; 107 } 108 Region region = regions.get(ThreadLocalRandom.current().nextInt(regions.size())); 109 try { 110 WALUtil.writeReplicationMarkerAndSync(wal, MVCC, region.getRegionInfo(), rowKey, timeStamp); 111 } catch (IOException ioe) { 112 LOG.error("Exception while sync'ing replication tracker edit", ioe); 113 // TODO: Should we stop region server or add a metric and keep going. 114 } 115 } 116 117 /** 118 * Creates a rowkey with region server name and timestamp. 119 * @param serverName region server name 120 * @param timestamp timestamp 121 */ 122 public static byte[] getRowKey(String serverName, long timestamp) { 123 // converting to string since this will help seeing the timestamp in string format using 124 // hbase shell commands. 125 String timestampStr = String.valueOf(timestamp); 126 final String rowKeyStr = serverName + DELIMITER + timestampStr; 127 return Bytes.toBytes(rowKeyStr); 128 } 129}