001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.rsgroup; 019 020import java.io.ByteArrayInputStream; 021import java.io.IOException; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.client.BalanceRequest; 028import org.apache.hadoop.hbase.client.BalanceResponse; 029import org.apache.hadoop.hbase.client.ConnectionFactory; 030import org.apache.hadoop.hbase.client.Result; 031import org.apache.hadoop.hbase.client.Scan; 032import org.apache.hadoop.hbase.client.Table; 033import org.apache.hadoop.hbase.exceptions.DeserializationException; 034import org.apache.hadoop.hbase.net.Address; 035import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 036import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; 037import org.apache.hadoop.hbase.zookeeper.ZKUtil; 038import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 039import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.apache.zookeeper.KeeperException; 042import org.junit.Assert; 043 044import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 045import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 046 047@InterfaceAudience.Private 048public class VerifyingRSGroupAdminClient implements RSGroupAdmin { 049 private Table table; 050 private ZKWatcher zkw; 051 private RSGroupAdmin wrapped; 052 053 public VerifyingRSGroupAdminClient(RSGroupAdmin RSGroupAdmin, Configuration conf) 054 throws IOException { 055 wrapped = RSGroupAdmin; 056 table = 057 ConnectionFactory.createConnection(conf).getTable(RSGroupInfoManager.RSGROUP_TABLE_NAME); 058 zkw = new ZKWatcher(conf, this.getClass().getSimpleName(), null); 059 } 060 061 @Override 062 public void addRSGroup(String groupName) throws IOException { 063 wrapped.addRSGroup(groupName); 064 verify(); 065 } 066 067 @Override 068 public RSGroupInfo getRSGroupInfo(String groupName) throws IOException { 069 return wrapped.getRSGroupInfo(groupName); 070 } 071 072 @Override 073 public RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException { 074 return wrapped.getRSGroupInfoOfTable(tableName); 075 } 076 077 @Override 078 public void moveServers(Set<Address> servers, String targetGroup) throws IOException { 079 wrapped.moveServers(servers, targetGroup); 080 verify(); 081 } 082 083 @Override 084 public void moveTables(Set<TableName> tables, String targetGroup) throws IOException { 085 wrapped.moveTables(tables, targetGroup); 086 verify(); 087 } 088 089 @Override 090 public void removeRSGroup(String name) throws IOException { 091 wrapped.removeRSGroup(name); 092 verify(); 093 } 094 095 @Override 096 public BalanceResponse balanceRSGroup(String groupName, BalanceRequest request) 097 throws IOException { 098 return wrapped.balanceRSGroup(groupName, request); 099 } 100 101 @Override 102 public List<RSGroupInfo> listRSGroups() throws IOException { 103 return wrapped.listRSGroups(); 104 } 105 106 @Override 107 public RSGroupInfo getRSGroupOfServer(Address hostPort) throws IOException { 108 return wrapped.getRSGroupOfServer(hostPort); 109 } 110 111 @Override 112 public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String targetGroup) 113 throws IOException { 114 wrapped.moveServersAndTables(servers, tables, targetGroup); 115 verify(); 116 } 117 118 @Override 119 public void removeServers(Set<Address> servers) throws IOException { 120 wrapped.removeServers(servers); 121 verify(); 122 } 123 124 @Override 125 public void renameRSGroup(String oldName, String newName) throws IOException { 126 wrapped.renameRSGroup(oldName, newName); 127 verify(); 128 } 129 130 @Override 131 public void updateRSGroupConfig(String groupName, Map<String, String> configuration) 132 throws IOException { 133 wrapped.updateRSGroupConfig(groupName, configuration); 134 verify(); 135 } 136 137 @Override 138 public void updateConfiguration(String groupName) throws IOException { 139 wrapped.updateConfiguration(groupName); 140 } 141 142 public void verify() throws IOException { 143 Map<String, RSGroupInfo> groupMap = Maps.newHashMap(); 144 Set<RSGroupInfo> zList = Sets.newHashSet(); 145 146 for (Result result : table.getScanner(new Scan())) { 147 RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo.parseFrom(result 148 .getValue(RSGroupInfoManager.META_FAMILY_BYTES, RSGroupInfoManager.META_QUALIFIER_BYTES)); 149 groupMap.put(proto.getName(), RSGroupProtobufUtil.toGroupInfo(proto)); 150 } 151 Assert.assertEquals(Sets.newHashSet(groupMap.values()), 152 Sets.newHashSet(wrapped.listRSGroups())); 153 try { 154 String groupBasePath = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode, "rsgroup"); 155 for (String znode : ZKUtil.listChildrenNoWatch(zkw, groupBasePath)) { 156 byte[] data = ZKUtil.getData(zkw, ZNodePaths.joinZNode(groupBasePath, znode)); 157 if (data.length > 0) { 158 ProtobufUtil.expectPBMagicPrefix(data); 159 ByteArrayInputStream bis = 160 new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length); 161 zList.add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis))); 162 } 163 } 164 Assert.assertEquals(zList.size(), groupMap.size()); 165 for (RSGroupInfo RSGroupInfo : zList) { 166 Assert.assertTrue(groupMap.get(RSGroupInfo.getName()).equals(RSGroupInfo)); 167 } 168 } catch (KeeperException e) { 169 throw new IOException("ZK verification failed", e); 170 } catch (DeserializationException e) { 171 throw new IOException("ZK verification failed", e); 172 } catch (InterruptedException e) { 173 throw new IOException("ZK verification failed", e); 174 } 175 } 176}