001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.empty; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.verify; 027 028import java.io.IOException; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.ConcurrentMap; 035import java.util.concurrent.ExecutorService; 036import java.util.concurrent.Executors; 037import java.util.concurrent.TimeUnit; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseTestingUtil; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.TableNameTestRule; 045import org.apache.hadoop.hbase.client.Result; 046import org.apache.hadoop.hbase.client.ResultScanner; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager.ReplicationQueueStorageInitializer; 049import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 050import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 051import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 052import org.apache.hadoop.hbase.replication.ReplicationQueueData; 053import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 054import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 055import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage; 056import org.apache.hadoop.hbase.replication.TestZKReplicationQueueStorage; 057import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; 058import org.apache.hadoop.hbase.testclassification.MasterTests; 059import org.apache.hadoop.hbase.testclassification.MediumTests; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 062import org.junit.AfterClass; 063import org.junit.Before; 064import org.junit.BeforeClass; 065import org.junit.ClassRule; 066import org.junit.Rule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069 070import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 071 072@Category({ MasterTests.class, MediumTests.class }) 073public class TestReplicationPeerManagerMigrateQueuesFromZk { 074 075 @ClassRule 076 public static final HBaseClassTestRule CLASS_RULE = 077 HBaseClassTestRule.forClass(TestReplicationPeerManagerMigrateQueuesFromZk.class); 078 079 private static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 080 081 private static ExecutorService EXECUTOR; 082 083 ConcurrentMap<String, ReplicationPeerDescription> peers; 084 085 private ReplicationPeerStorage peerStorage; 086 087 private ReplicationQueueStorage queueStorage; 088 089 private ReplicationQueueStorageInitializer queueStorageInitializer; 090 091 private ReplicationPeerManager manager; 092 093 private int nServers = 10; 094 095 private int nPeers = 10; 096 097 private int nRegions = 100; 098 099 private ServerName deadServerName; 100 101 @Rule 102 public final TableNameTestRule tableNameRule = new TableNameTestRule(); 103 104 @BeforeClass 105 public static void setUpBeforeClass() throws Exception { 106 UTIL.startMiniCluster(1); 107 EXECUTOR = Executors.newFixedThreadPool(3, 108 new ThreadFactoryBuilder().setDaemon(true) 109 .setNameFormat(TestReplicationPeerManagerMigrateQueuesFromZk.class.getSimpleName() + "-%d") 110 .build()); 111 } 112 113 @AfterClass 114 public static void tearDownAfterClass() throws Exception { 115 EXECUTOR.shutdownNow(); 116 UTIL.shutdownMiniCluster(); 117 } 118 119 @Before 120 public void setUp() throws IOException { 121 Configuration conf = UTIL.getConfiguration(); 122 peerStorage = mock(ReplicationPeerStorage.class); 123 TableName tableName = tableNameRule.getTableName(); 124 UTIL.getAdmin() 125 .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName)); 126 queueStorage = new TableReplicationQueueStorage(UTIL.getConnection(), tableName); 127 queueStorageInitializer = mock(ReplicationQueueStorageInitializer.class); 128 peers = new ConcurrentHashMap<>(); 129 deadServerName = 130 ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime()); 131 manager = new ReplicationPeerManager(UTIL.getTestFileSystem(), UTIL.getZooKeeperWatcher(), 132 peerStorage, queueStorage, peers, conf, "cluster", queueStorageInitializer); 133 } 134 135 private Map<String, Set<String>> prepareData() throws Exception { 136 ZKReplicationQueueStorageForMigration storage = new ZKReplicationQueueStorageForMigration( 137 UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); 138 TestZKReplicationQueueStorage.mockQueuesData(storage, 10, "peer_0", deadServerName); 139 Map<String, Set<String>> encodedName2PeerIds = TestZKReplicationQueueStorage 140 .mockLastPushedSeqIds(storage, "peer_1", "peer_2", nRegions, 10, 10); 141 TestZKReplicationQueueStorage.mockHFileRefs(storage, 10); 142 return encodedName2PeerIds; 143 } 144 145 @Test 146 public void testNoPeers() throws Exception { 147 prepareData(); 148 manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES); 149 // should have called initializer 150 verify(queueStorageInitializer).initialize(); 151 // should have not migrated any data since there is no peer 152 try (Table table = UTIL.getConnection().getTable(tableNameRule.getTableName())) { 153 assertEquals(0, HBaseTestingUtil.countRows(table)); 154 } 155 } 156 157 @Test 158 public void testMigrate() throws Exception { 159 Map<String, Set<String>> encodedName2PeerIds = prepareData(); 160 // add all peers so we will migrate them all 161 for (int i = 0; i < nPeers; i++) { 162 // value is not used in this test, so just add a mock 163 peers.put("peer_" + i, mock(ReplicationPeerDescription.class)); 164 } 165 manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES); 166 // should have called initializer 167 verify(queueStorageInitializer).initialize(); 168 List<ReplicationQueueData> queueDatas = queueStorage.listAllQueues(); 169 // there should be two empty queues so minus 2 170 assertEquals(2 * nServers - 2, queueDatas.size()); 171 for (ReplicationQueueData queueData : queueDatas) { 172 assertEquals("peer_0", queueData.getId().getPeerId()); 173 assertEquals(1, queueData.getOffsets().size()); 174 String walGroup = queueData.getId().getServerWALsBelongTo().toString(); 175 ReplicationGroupOffset offset = queueData.getOffsets().get(walGroup); 176 assertEquals(0, offset.getOffset()); 177 assertEquals(queueData.getId().getServerWALsBelongTo().toString() + ".0", offset.getWal()); 178 } 179 // there is no method in ReplicationQueueStorage can list all the last pushed sequence ids 180 try (Table table = UTIL.getConnection().getTable(tableNameRule.getTableName()); 181 ResultScanner scanner = 182 table.getScanner(TableReplicationQueueStorage.LAST_SEQUENCE_ID_FAMILY)) { 183 for (int i = 0; i < 2; i++) { 184 Result result = scanner.next(); 185 String peerId = Bytes.toString(result.getRow()); 186 assertEquals(nRegions, result.size()); 187 for (Cell cell : result.rawCells()) { 188 String encodedRegionName = Bytes.toString(cell.getQualifierArray(), 189 cell.getQualifierOffset(), cell.getQualifierLength()); 190 encodedName2PeerIds.get(encodedRegionName).remove(peerId); 191 long seqId = 192 Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); 193 assertEquals(i + 1, seqId); 194 } 195 } 196 encodedName2PeerIds.forEach((encodedRegionName, peerIds) -> { 197 assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty()); 198 }); 199 assertNull(scanner.next()); 200 } 201 for (int i = 0; i < nPeers; i++) { 202 List<String> refs = queueStorage.getReplicableHFiles("peer_" + i); 203 assertEquals(i, refs.size()); 204 Set<String> refsSet = new HashSet<>(refs); 205 for (int j = 0; j < i; j++) { 206 assertTrue(refsSet.remove("hfile-" + j)); 207 } 208 assertThat(refsSet, empty()); 209 } 210 } 211}