001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.empty; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertTrue; 027 028import java.io.IOException; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033import java.util.concurrent.ThreadLocalRandom; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseZKTestingUtil; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator; 039import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId; 040import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData; 041import org.apache.hadoop.hbase.testclassification.MediumTests; 042import org.apache.hadoop.hbase.testclassification.ReplicationTests; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.apache.hadoop.hbase.util.MD5Hash; 045import org.apache.hadoop.hbase.util.Pair; 046import org.apache.hadoop.hbase.zookeeper.ZKUtil; 047import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 048import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 049import org.apache.zookeeper.KeeperException; 050import org.junit.After; 051import org.junit.AfterClass; 052import org.junit.Before; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059 060import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 061import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 062import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 063import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 064 065@Category({ ReplicationTests.class, MediumTests.class }) 066public class TestZKReplicationQueueStorage { 067 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class); 071 072 private static final HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil(); 073 074 private ZKWatcher zk; 075 076 private ZKReplicationQueueStorageForMigration storage; 077 078 @Rule 079 public final TestName name = new TestName(); 080 081 @BeforeClass 082 public static void setUpBeforeClass() throws Exception { 083 UTIL.startMiniZKCluster(); 084 } 085 086 @AfterClass 087 public static void tearDownAfterClass() throws IOException { 088 UTIL.shutdownMiniZKCluster(); 089 } 090 091 @Before 092 public void setUp() throws IOException { 093 Configuration conf = UTIL.getConfiguration(); 094 conf.set(ZKReplicationStorageBase.REPLICATION_ZNODE, name.getMethodName()); 095 zk = new ZKWatcher(conf, name.getMethodName(), null); 096 storage = new ZKReplicationQueueStorageForMigration(zk, conf); 097 } 098 099 @After 100 public void tearDown() throws Exception { 101 ZKUtil.deleteNodeRecursively(zk, storage.replicationZNode); 102 Closeables.close(zk, true); 103 } 104 105 public static void mockQueuesData(ZKReplicationQueueStorageForMigration storage, int nServers, 106 String peerId, ServerName deadServer) throws KeeperException { 107 ZKWatcher zk = storage.zookeeper; 108 for (int i = 0; i < nServers; i++) { 109 ServerName sn = 110 ServerName.valueOf("test-hbase-" + i, 12345, EnvironmentEdgeManager.currentTime()); 111 String rsZNode = ZNodePaths.joinZNode(storage.getQueuesZNode(), sn.toString()); 112 String peerZNode = ZNodePaths.joinZNode(rsZNode, peerId); 113 ZKUtil.createWithParents(zk, peerZNode); 114 for (int j = 0; j < i; j++) { 115 String wal = ZNodePaths.joinZNode(peerZNode, sn.toString() + "." + j); 116 ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j)); 117 } 118 String deadServerPeerZNode = ZNodePaths.joinZNode(rsZNode, peerId + "-" + deadServer); 119 ZKUtil.createWithParents(zk, deadServerPeerZNode); 120 for (int j = 0; j < i; j++) { 121 String wal = ZNodePaths.joinZNode(deadServerPeerZNode, deadServer.toString() + "." + j); 122 if (j > 0) { 123 ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j)); 124 } else { 125 ZKUtil.createWithParents(zk, wal); 126 } 127 } 128 } 129 ZKUtil.createWithParents(zk, 130 ZNodePaths.joinZNode(storage.getQueuesZNode(), deadServer.toString())); 131 } 132 133 private static String getLastPushedSeqIdZNode(String regionsZNode, String encodedName, 134 String peerId) { 135 return ZNodePaths.joinZNode(regionsZNode, encodedName.substring(0, 2), 136 encodedName.substring(2, 4), encodedName.substring(4) + "-" + peerId); 137 } 138 139 public static Map<String, Set<String>> mockLastPushedSeqIds( 140 ZKReplicationQueueStorageForMigration storage, String peerId1, String peerId2, int nRegions, 141 int emptyLevel1Count, int emptyLevel2Count) throws KeeperException { 142 ZKWatcher zk = storage.zookeeper; 143 Map<String, Set<String>> name2PeerIds = new HashMap<>(); 144 byte[] bytes = new byte[32]; 145 for (int i = 0; i < nRegions; i++) { 146 ThreadLocalRandom.current().nextBytes(bytes); 147 String encodeName = MD5Hash.getMD5AsHex(bytes); 148 String znode1 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId1); 149 ZKUtil.createSetData(zk, znode1, ZKUtil.positionToByteArray(1)); 150 String znode2 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId2); 151 ZKUtil.createSetData(zk, znode2, ZKUtil.positionToByteArray(2)); 152 name2PeerIds.put(encodeName, Sets.newHashSet(peerId1, peerId2)); 153 } 154 int addedEmptyZNodes = 0; 155 for (int i = 0; i < 256; i++) { 156 String level1ZNode = 157 ZNodePaths.joinZNode(storage.getRegionsZNode(), String.format("%02x", i)); 158 if (ZKUtil.checkExists(zk, level1ZNode) == -1) { 159 ZKUtil.createWithParents(zk, level1ZNode); 160 addedEmptyZNodes++; 161 if (addedEmptyZNodes <= emptyLevel2Count) { 162 ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(level1ZNode, "ab")); 163 } 164 if (addedEmptyZNodes >= emptyLevel1Count + emptyLevel2Count) { 165 break; 166 } 167 } 168 } 169 return name2PeerIds; 170 } 171 172 public static void mockHFileRefs(ZKReplicationQueueStorageForMigration storage, int nPeers) 173 throws KeeperException { 174 ZKWatcher zk = storage.zookeeper; 175 for (int i = 0; i < nPeers; i++) { 176 String peerId = "peer_" + i; 177 ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId)); 178 for (int j = 0; j < i; j++) { 179 ZKUtil.createWithParents(zk, 180 ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId, "hfile-" + j)); 181 } 182 } 183 } 184 185 @Test 186 public void testDeleteAllData() throws Exception { 187 assertFalse(storage.hasData()); 188 ZKUtil.createWithParents(zk, storage.getQueuesZNode()); 189 assertTrue(storage.hasData()); 190 storage.deleteAllData(); 191 assertFalse(storage.hasData()); 192 } 193 194 @Test 195 public void testEmptyIter() throws Exception { 196 ZKUtil.createWithParents(zk, storage.getQueuesZNode()); 197 ZKUtil.createWithParents(zk, storage.getRegionsZNode()); 198 ZKUtil.createWithParents(zk, storage.getHfileRefsZNode()); 199 assertNull(storage.listAllQueues().next()); 200 assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode())); 201 assertNull(storage.listAllLastPushedSeqIds().next()); 202 assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode())); 203 assertNull(storage.listAllHFileRefs().next()); 204 assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode())); 205 } 206 207 @Test 208 public void testListAllQueues() throws Exception { 209 String peerId = "1"; 210 ServerName deadServer = 211 ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime()); 212 int nServers = 10; 213 mockQueuesData(storage, nServers, peerId, deadServer); 214 MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter = 215 storage.listAllQueues(); 216 ServerName previousServerName = null; 217 for (int i = 0; i < nServers + 1; i++) { 218 Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next(); 219 assertNotNull(pair); 220 if (previousServerName != null) { 221 assertEquals(-1, ZKUtil.checkExists(zk, 222 ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString()))); 223 } 224 ServerName sn = pair.getFirst(); 225 previousServerName = sn; 226 if (sn.equals(deadServer)) { 227 assertThat(pair.getSecond(), empty()); 228 } else { 229 assertEquals(2, pair.getSecond().size()); 230 int n = Integer.parseInt(Iterables.getLast(Splitter.on('-').split(sn.getHostname()))); 231 ZkReplicationQueueData data0 = pair.getSecond().get(0); 232 assertEquals(peerId, data0.getQueueId().getPeerId()); 233 assertEquals(sn, data0.getQueueId().getServerName()); 234 assertEquals(n, data0.getWalOffsets().size()); 235 for (int j = 0; j < n; j++) { 236 assertEquals(j, 237 data0.getWalOffsets().get( 238 (data0.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j) 239 .intValue()); 240 } 241 ZkReplicationQueueData data1 = pair.getSecond().get(1); 242 assertEquals(peerId, data1.getQueueId().getPeerId()); 243 assertEquals(sn, data1.getQueueId().getServerName()); 244 assertEquals(n, data1.getWalOffsets().size()); 245 for (int j = 0; j < n; j++) { 246 assertEquals(j, 247 data1.getWalOffsets().get( 248 (data1.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j) 249 .intValue()); 250 } 251 // the order of the returned result is undetermined 252 if (data0.getQueueId().getSourceServerName().isPresent()) { 253 assertEquals(deadServer, data0.getQueueId().getSourceServerName().get()); 254 assertFalse(data1.getQueueId().getSourceServerName().isPresent()); 255 } else { 256 assertEquals(deadServer, data1.getQueueId().getSourceServerName().get()); 257 } 258 } 259 } 260 assertNull(iter.next()); 261 assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode())); 262 } 263 264 @Test 265 public void testListAllLastPushedSeqIds() throws Exception { 266 String peerId1 = "1"; 267 String peerId2 = "2"; 268 Map<String, Set<String>> name2PeerIds = 269 mockLastPushedSeqIds(storage, peerId1, peerId2, 100, 10, 10); 270 MigrationIterator<List<ZkLastPushedSeqId>> iter = storage.listAllLastPushedSeqIds(); 271 int emptyListCount = 0; 272 for (;;) { 273 List<ZkLastPushedSeqId> list = iter.next(); 274 if (list == null) { 275 break; 276 } 277 if (list.isEmpty()) { 278 emptyListCount++; 279 continue; 280 } 281 for (ZkLastPushedSeqId seqId : list) { 282 name2PeerIds.get(seqId.getEncodedRegionName()).remove(seqId.getPeerId()); 283 if (seqId.getPeerId().equals(peerId1)) { 284 assertEquals(1, seqId.getLastPushedSeqId()); 285 } else { 286 assertEquals(2, seqId.getLastPushedSeqId()); 287 } 288 } 289 } 290 assertEquals(10, emptyListCount); 291 name2PeerIds.forEach((encodedRegionName, peerIds) -> { 292 assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty()); 293 }); 294 assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode())); 295 } 296 297 @Test 298 public void testListAllHFileRefs() throws Exception { 299 int nPeers = 10; 300 mockHFileRefs(storage, nPeers); 301 MigrationIterator<Pair<String, List<String>>> iter = storage.listAllHFileRefs(); 302 String previousPeerId = null; 303 for (int i = 0; i < nPeers; i++) { 304 Pair<String, List<String>> pair = iter.next(); 305 if (previousPeerId != null) { 306 assertEquals(-1, ZKUtil.checkExists(zk, 307 ZNodePaths.joinZNode(storage.getHfileRefsZNode(), previousPeerId))); 308 } 309 String peerId = pair.getFirst(); 310 previousPeerId = peerId; 311 int index = Integer.parseInt(Iterables.getLast(Splitter.on('_').split(peerId))); 312 assertEquals(index, pair.getSecond().size()); 313 } 314 assertNull(iter.next()); 315 assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode())); 316 } 317}