001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNull; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.List; 026import java.util.Map; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Abortable; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 033import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 034import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; 035import org.apache.hadoop.hbase.testclassification.ReplicationTests; 036import org.apache.hadoop.hbase.testclassification.SmallTests; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.zookeeper.ZKUtil; 039import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 040import org.junit.AfterClass; 041import org.junit.BeforeClass; 042import org.junit.ClassRule; 043import org.junit.Rule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.junit.rules.TestName; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050@Category({ ReplicationTests.class, SmallTests.class }) 051public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader { 052 053 @ClassRule 054 public static final HBaseClassTestRule CLASS_RULE = 055 HBaseClassTestRule.forClass(TestTableCFsUpdater.class); 056 057 private static final Logger LOG = LoggerFactory.getLogger(TestTableCFsUpdater.class); 058 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 059 060 private static ZKWatcher zkw = null; 061 private static Abortable abortable = null; 062 private static ZKStorageUtil zkStorageUtil = null; 063 064 private static class ZKStorageUtil extends ZKReplicationPeerStorage { 065 public ZKStorageUtil(ZKWatcher zookeeper, Configuration conf) { 066 super(zookeeper, conf); 067 } 068 } 069 070 @Rule 071 public TestName name = new TestName(); 072 073 public TestTableCFsUpdater() throws IOException { 074 super(zkw, TEST_UTIL.getConfiguration()); 075 } 076 077 @BeforeClass 078 public static void setUpBeforeClass() throws Exception { 079 TEST_UTIL.startMiniZKCluster(); 080 Configuration conf = TEST_UTIL.getConfiguration(); 081 abortable = new Abortable() { 082 @Override 083 public void abort(String why, Throwable e) { 084 LOG.info(why, e); 085 } 086 087 @Override 088 public boolean isAborted() { 089 return false; 090 } 091 }; 092 zkw = new ZKWatcher(conf, "TableCFs", abortable, true); 093 zkStorageUtil = new ZKStorageUtil(zkw, conf); 094 } 095 096 @AfterClass 097 public static void tearDownAfterClass() throws Exception { 098 TEST_UTIL.shutdownMiniZKCluster(); 099 } 100 101 @Test 102 public void testUpgrade() throws Exception { 103 String peerId = "1"; 104 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); 105 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); 106 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3"); 107 108 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 109 rpc.setClusterKey(zkw.getQuorum()); 110 String peerNode = zkStorageUtil.getPeerNode(peerId); 111 ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); 112 113 String tableCFs = tableName1 + ":cf1,cf2;" + tableName2 + ":cf3;" + tableName3; 114 String tableCFsNode = getTableCFsNode(peerId); 115 LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); 116 ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs)); 117 118 ReplicationPeerConfig actualRpc = 119 ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 120 String actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); 121 122 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 123 assertNull(actualRpc.getTableCFsMap()); 124 assertEquals(tableCFs, actualTableCfs); 125 126 peerId = "2"; 127 rpc = new ReplicationPeerConfig(); 128 rpc.setClusterKey(zkw.getQuorum()); 129 peerNode = zkStorageUtil.getPeerNode(peerId); 130 ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); 131 132 tableCFs = tableName1 + ":cf1,cf3;" + tableName2 + ":cf2"; 133 tableCFsNode = getTableCFsNode(peerId); 134 LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); 135 ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs)); 136 137 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 138 actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); 139 140 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 141 assertNull(actualRpc.getTableCFsMap()); 142 assertEquals(tableCFs, actualTableCfs); 143 144 peerId = "3"; 145 rpc = new ReplicationPeerConfig(); 146 rpc.setClusterKey(zkw.getQuorum()); 147 peerNode = zkStorageUtil.getPeerNode(peerId); 148 ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); 149 150 tableCFs = ""; 151 tableCFsNode = getTableCFsNode(peerId); 152 LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); 153 ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs)); 154 155 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 156 actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); 157 158 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 159 assertNull(actualRpc.getTableCFsMap()); 160 assertEquals(tableCFs, actualTableCfs); 161 162 peerId = "4"; 163 rpc = new ReplicationPeerConfig(); 164 rpc.setClusterKey(zkw.getQuorum()); 165 peerNode = zkStorageUtil.getPeerNode(peerId); 166 ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); 167 168 tableCFsNode = getTableCFsNode(peerId); 169 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 170 actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); 171 172 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 173 assertNull(actualRpc.getTableCFsMap()); 174 assertNull(actualTableCfs); 175 176 copyTableCFs(); 177 178 peerId = "1"; 179 peerNode = zkStorageUtil.getPeerNode(peerId); 180 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 181 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 182 Map<TableName, List<String>> tableNameListMap = actualRpc.getTableCFsMap(); 183 assertEquals(3, tableNameListMap.size()); 184 assertTrue(tableNameListMap.containsKey(tableName1)); 185 assertTrue(tableNameListMap.containsKey(tableName2)); 186 assertTrue(tableNameListMap.containsKey(tableName3)); 187 assertEquals(2, tableNameListMap.get(tableName1).size()); 188 assertEquals("cf1", tableNameListMap.get(tableName1).get(0)); 189 assertEquals("cf2", tableNameListMap.get(tableName1).get(1)); 190 assertEquals(1, tableNameListMap.get(tableName2).size()); 191 assertEquals("cf3", tableNameListMap.get(tableName2).get(0)); 192 assertNull(tableNameListMap.get(tableName3)); 193 194 peerId = "2"; 195 peerNode = zkStorageUtil.getPeerNode(peerId); 196 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 197 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 198 tableNameListMap = actualRpc.getTableCFsMap(); 199 assertEquals(2, tableNameListMap.size()); 200 assertTrue(tableNameListMap.containsKey(tableName1)); 201 assertTrue(tableNameListMap.containsKey(tableName2)); 202 assertEquals(2, tableNameListMap.get(tableName1).size()); 203 assertEquals("cf1", tableNameListMap.get(tableName1).get(0)); 204 assertEquals("cf3", tableNameListMap.get(tableName1).get(1)); 205 assertEquals(1, tableNameListMap.get(tableName2).size()); 206 assertEquals("cf2", tableNameListMap.get(tableName2).get(0)); 207 208 peerId = "3"; 209 peerNode = zkStorageUtil.getPeerNode(peerId); 210 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 211 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 212 tableNameListMap = actualRpc.getTableCFsMap(); 213 assertNull(tableNameListMap); 214 215 peerId = "4"; 216 peerNode = zkStorageUtil.getPeerNode(peerId); 217 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 218 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 219 tableNameListMap = actualRpc.getTableCFsMap(); 220 assertNull(tableNameListMap); 221 } 222}