001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.concurrent.TimeUnit; 023import org.apache.commons.lang3.mutable.MutableLong; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.CellUtil; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.client.RegionInfo; 030import org.apache.hadoop.hbase.master.RegionState; 031import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat; 032import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat.ReplicationBarrierResult; 033import org.apache.hadoop.hbase.replication.ReplicationException; 034import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.wal.WAL.Entry; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 042import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 043import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; 044import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; 045 046/** 047 * <p> 048 * Helper class to determine whether we can push a given WAL entry without breaking the replication 049 * order. The class is designed to per {@link ReplicationSourceWALReader}, so not thread safe. 050 * </p> 051 * <p> 052 * We record all the open sequence number for a region in a special family in meta, which is called 053 * 'rep_barrier', so there will be a sequence of open sequence number (b1, b2, b3, ...). We call 054 * [bn, bn+1) a range, and it is obvious that a region will always be on the same RS within a range. 055 * <p> 056 * When split and merge, we will also record the parent for the generated region(s) in the special 057 * family in meta. And also, we will write an extra 'open sequence number' for the parent region(s), 058 * which is the max sequence id of the region plus one. 059 * </p> 060 * </p> 061 * <p> 062 * For each peer, we record the last pushed sequence id for each region. It is managed by the 063 * replication storage. 064 * </p> 065 * <p> 066 * The algorithm works like this: 067 * <ol> 068 * <li>Locate the sequence id we want to push in the barriers</li> 069 * <li>If it is before the first barrier, we are safe to push. This usually because we enable serial 070 * replication for this table after we create the table and write data into the table.</li> 071 * <li>In general, if the previous range is finished, then we are safe to push. The way to determine 072 * whether a range is finish is straight-forward: check whether the last pushed sequence id is equal 073 * to the end barrier of the range minus 1. There are several exceptions: 074 * <ul> 075 * <li>If it is in the first range, we need to check whether there are parent regions. If so, we 076 * need to make sure that the data for parent regions have all been pushed.</li> 077 * <li>If it is in the last range, we need to check the region state. If state is OPENING, then we 078 * are not safe to push. This is because that, before we call reportRIT to master which update the 079 * open sequence number into meta table, we will write a open region event marker to WAL first, and 080 * its sequence id is greater than the newest open sequence number(which has not been updated to 081 * meta table yet so we do not know). For this scenario, the WAL entry for this open region event 082 * marker actually belongs to the range after the 'last' range, so we are not safe to push it. 083 * Otherwise the last pushed sequence id will be updated to this value and then we think the 084 * previous range has already been finished, but this is not true.</li> 085 * <li>Notice that the above two exceptions are not conflicts, since the first range can also be the 086 * last range if we only have one range.</li> 087 * </ul> 088 * </li> 089 * </ol> 090 * </p> 091 * <p> 092 * And for performance reason, we do not want to check meta for every WAL entry, so we introduce two 093 * in memory maps. The idea is simple: 094 * <ul> 095 * <li>If a range can be pushed, then put its end barrier into the {@code canPushUnder} map.</li> 096 * <li>Before accessing meta, first check the sequence id stored in the {@code canPushUnder} map. If 097 * the sequence id of WAL entry is less the one stored in {@code canPushUnder} map, then we are safe 098 * to push.</li> 099 * </ul> 100 * And for the last range, we do not have an end barrier, so we use the continuity of sequence id to 101 * determine whether we can push. The rule is: 102 * <ul> 103 * <li>When an entry is able to push, then put its sequence id into the {@code pushed} map.</li> 104 * <li>Check if the sequence id of WAL entry equals to the one stored in the {@code pushed} map plus 105 * one. If so, we are safe to push, and also update the {@code pushed} map with the sequence id of 106 * the WAL entry.</li> 107 * </ul> 108 * </p> 109 */ 110@InterfaceAudience.Private 111class SerialReplicationChecker { 112 113 private static final Logger LOG = LoggerFactory.getLogger(SerialReplicationChecker.class); 114 115 public static final String REPLICATION_SERIALLY_WAITING_KEY = 116 "hbase.serial.replication.waiting.ms"; 117 public static final long REPLICATION_SERIALLY_WAITING_DEFAULT = 10000; 118 119 private final String peerId; 120 121 private final ReplicationQueueStorage storage; 122 123 private final Connection conn; 124 125 private final long waitTimeMs; 126 127 private final LoadingCache<String, MutableLong> pushed = CacheBuilder.newBuilder() 128 .expireAfterAccess(1, TimeUnit.DAYS).build(new CacheLoader<String, MutableLong>() { 129 130 @Override 131 public MutableLong load(String key) throws Exception { 132 return new MutableLong(HConstants.NO_SEQNUM); 133 } 134 }); 135 136 // Use guava cache to set ttl for each key 137 private final Cache<String, Long> canPushUnder = 138 CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.DAYS).build(); 139 140 public SerialReplicationChecker(Configuration conf, ReplicationSource source) { 141 this.peerId = source.getPeerId(); 142 this.storage = source.getReplicationQueueStorage(); 143 this.conn = source.getServer().getConnection(); 144 this.waitTimeMs = 145 conf.getLong(REPLICATION_SERIALLY_WAITING_KEY, REPLICATION_SERIALLY_WAITING_DEFAULT); 146 } 147 148 private boolean isRangeFinished(long endBarrier, String encodedRegionName) throws IOException { 149 long pushedSeqId; 150 try { 151 pushedSeqId = storage.getLastSequenceId(encodedRegionName, peerId); 152 } catch (ReplicationException e) { 153 throw new IOException( 154 "Failed to get pushed sequence id for " + encodedRegionName + ", peer " + peerId, e); 155 } 156 // endBarrier is the open sequence number. When opening a region, the open sequence number will 157 // be set to the old max sequence id plus one, so here we need to minus one. 158 return pushedSeqId >= endBarrier - 1; 159 } 160 161 private boolean isParentFinished(byte[] regionName) throws IOException { 162 long[] barriers = ReplicationBarrierFamilyFormat.getReplicationBarriers(conn, regionName); 163 if (barriers.length == 0) { 164 return true; 165 } 166 return isRangeFinished(barriers[barriers.length - 1], RegionInfo.encodeRegionName(regionName)); 167 } 168 169 // We may write a open region marker to WAL before we write the open sequence number to meta, so 170 // if a region is in OPENING state and we are in the last range, it is not safe to say we can push 171 // even if the previous range is finished. 172 private boolean isLastRangeAndOpening(ReplicationBarrierResult barrierResult, int index) { 173 return index == barrierResult.getBarriers().length 174 && barrierResult.getState() == RegionState.State.OPENING; 175 } 176 177 private void recordCanPush(String encodedNameAsString, long seqId, long[] barriers, int index) { 178 if (barriers.length > index) { 179 canPushUnder.put(encodedNameAsString, barriers[index]); 180 } 181 pushed.getUnchecked(encodedNameAsString).setValue(seqId); 182 } 183 184 private boolean canPush(Entry entry, byte[] row) throws IOException { 185 String encodedNameAsString = Bytes.toString(entry.getKey().getEncodedRegionName()); 186 long seqId = entry.getKey().getSequenceId(); 187 ReplicationBarrierResult barrierResult = 188 ReplicationBarrierFamilyFormat.getReplicationBarrierResult(conn, 189 entry.getKey().getTableName(), row, entry.getKey().getEncodedRegionName()); 190 LOG.debug("Replication barrier for {}: {}", entry, barrierResult); 191 long[] barriers = barrierResult.getBarriers(); 192 int index = Arrays.binarySearch(barriers, seqId); 193 if (index == -1) { 194 LOG.debug("{} is before the first barrier, pass", entry); 195 // This means we are in the range before the first record openSeqNum, this usually because the 196 // wal is written before we enable serial replication for this table, just return true since 197 // we can not guarantee the order. 198 pushed.getUnchecked(encodedNameAsString).setValue(seqId); 199 return true; 200 } 201 // The sequence id range is left closed and right open, so either we decrease the missed insert 202 // point to make the index start from 0, or increase the hit insert point to make the index 203 // start from 1. Here we choose the latter one. 204 if (index < 0) { 205 index = -index - 1; 206 } else { 207 index++; 208 } 209 if (index == 1) { 210 // we are in the first range, check whether we have parents 211 for (byte[] regionName : barrierResult.getParentRegionNames()) { 212 if (!isParentFinished(regionName)) { 213 LOG.debug("Parent {} has not been finished yet for entry {}, give up", 214 Bytes.toStringBinary(regionName), entry); 215 return false; 216 } 217 } 218 if (isLastRangeAndOpening(barrierResult, index)) { 219 LOG.debug("{} is in the last range and the region is opening, give up", entry); 220 return false; 221 } 222 LOG.debug("{} is in the first range, pass", entry); 223 recordCanPush(encodedNameAsString, seqId, barriers, 1); 224 return true; 225 } 226 // check whether the previous range is finished 227 if (!isRangeFinished(barriers[index - 1], encodedNameAsString)) { 228 LOG.debug("Previous range for {} has not been finished yet, give up", entry); 229 return false; 230 } 231 if (isLastRangeAndOpening(barrierResult, index)) { 232 LOG.debug("{} is in the last range and the region is opening, give up", entry); 233 return false; 234 } 235 LOG.debug("The previous range for {} has been finished, pass", entry); 236 recordCanPush(encodedNameAsString, seqId, barriers, index); 237 return true; 238 } 239 240 public boolean canPush(Entry entry, Cell firstCellInEdit) throws IOException { 241 String encodedNameAsString = Bytes.toString(entry.getKey().getEncodedRegionName()); 242 long seqId = entry.getKey().getSequenceId(); 243 Long canReplicateUnderSeqId = canPushUnder.getIfPresent(encodedNameAsString); 244 if (canReplicateUnderSeqId != null) { 245 if (seqId < canReplicateUnderSeqId.longValue()) { 246 LOG.trace("{} is before the end barrier {}, pass", entry, canReplicateUnderSeqId); 247 return true; 248 } 249 LOG.debug("{} is beyond the previous end barrier {}, remove from cache", entry, 250 canReplicateUnderSeqId); 251 // we are already beyond the last safe point, remove 252 canPushUnder.invalidate(encodedNameAsString); 253 } 254 // This is for the case where the region is currently opened on us, if the sequence id is 255 // continuous then we are safe to replicate. If there is a breakpoint, then maybe the region 256 // has been moved to another RS and then back, so we need to check the barrier. 257 MutableLong previousPushedSeqId = pushed.getUnchecked(encodedNameAsString); 258 if (seqId == previousPushedSeqId.longValue() + 1) { 259 LOG.trace("The sequence id for {} is continuous, pass", entry); 260 previousPushedSeqId.increment(); 261 return true; 262 } 263 return canPush(entry, CellUtil.cloneRow(firstCellInEdit)); 264 } 265 266 public void waitUntilCanPush(Entry entry, Cell firstCellInEdit) 267 throws IOException, InterruptedException { 268 byte[] row = CellUtil.cloneRow(firstCellInEdit); 269 while (!canPush(entry, row)) { 270 LOG.debug("Can not push {}, wait", entry); 271 Thread.sleep(waitTimeMs); 272 } 273 } 274}