001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rsgroup;
019
020import java.io.ByteArrayInputStream;
021import java.io.IOException;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.client.BalanceRequest;
028import org.apache.hadoop.hbase.client.BalanceResponse;
029import org.apache.hadoop.hbase.client.ConnectionFactory;
030import org.apache.hadoop.hbase.client.Result;
031import org.apache.hadoop.hbase.client.Scan;
032import org.apache.hadoop.hbase.client.Table;
033import org.apache.hadoop.hbase.exceptions.DeserializationException;
034import org.apache.hadoop.hbase.net.Address;
035import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
036import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
037import org.apache.hadoop.hbase.zookeeper.ZKUtil;
038import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
039import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.apache.zookeeper.KeeperException;
042import org.junit.Assert;
043
044import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
045import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
046
047@InterfaceAudience.Private
048public class VerifyingRSGroupAdminClient implements RSGroupAdmin {
049  private Table table;
050  private ZKWatcher zkw;
051  private RSGroupAdmin wrapped;
052
053  public VerifyingRSGroupAdminClient(RSGroupAdmin RSGroupAdmin, Configuration conf)
054    throws IOException {
055    wrapped = RSGroupAdmin;
056    table =
057      ConnectionFactory.createConnection(conf).getTable(RSGroupInfoManager.RSGROUP_TABLE_NAME);
058    zkw = new ZKWatcher(conf, this.getClass().getSimpleName(), null);
059  }
060
061  @Override
062  public void addRSGroup(String groupName) throws IOException {
063    wrapped.addRSGroup(groupName);
064    verify();
065  }
066
067  @Override
068  public RSGroupInfo getRSGroupInfo(String groupName) throws IOException {
069    return wrapped.getRSGroupInfo(groupName);
070  }
071
072  @Override
073  public RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException {
074    return wrapped.getRSGroupInfoOfTable(tableName);
075  }
076
077  @Override
078  public void moveServers(Set<Address> servers, String targetGroup) throws IOException {
079    wrapped.moveServers(servers, targetGroup);
080    verify();
081  }
082
083  @Override
084  public void moveTables(Set<TableName> tables, String targetGroup) throws IOException {
085    wrapped.moveTables(tables, targetGroup);
086    verify();
087  }
088
089  @Override
090  public void removeRSGroup(String name) throws IOException {
091    wrapped.removeRSGroup(name);
092    verify();
093  }
094
095  @Override
096  public BalanceResponse balanceRSGroup(String groupName, BalanceRequest request)
097    throws IOException {
098    return wrapped.balanceRSGroup(groupName, request);
099  }
100
101  @Override
102  public List<RSGroupInfo> listRSGroups() throws IOException {
103    return wrapped.listRSGroups();
104  }
105
106  @Override
107  public RSGroupInfo getRSGroupOfServer(Address hostPort) throws IOException {
108    return wrapped.getRSGroupOfServer(hostPort);
109  }
110
111  @Override
112  public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String targetGroup)
113    throws IOException {
114    wrapped.moveServersAndTables(servers, tables, targetGroup);
115    verify();
116  }
117
118  @Override
119  public void removeServers(Set<Address> servers) throws IOException {
120    wrapped.removeServers(servers);
121    verify();
122  }
123
124  @Override
125  public void renameRSGroup(String oldName, String newName) throws IOException {
126    wrapped.renameRSGroup(oldName, newName);
127    verify();
128  }
129
130  @Override
131  public void updateRSGroupConfig(String groupName, Map<String, String> configuration)
132    throws IOException {
133    wrapped.updateRSGroupConfig(groupName, configuration);
134    verify();
135  }
136
137  @Override
138  public void updateConfiguration(String groupName) throws IOException {
139    wrapped.updateConfiguration(groupName);
140  }
141
142  public void verify() throws IOException {
143    Map<String, RSGroupInfo> groupMap = Maps.newHashMap();
144    Set<RSGroupInfo> zList = Sets.newHashSet();
145
146    for (Result result : table.getScanner(new Scan())) {
147      RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo.parseFrom(result
148        .getValue(RSGroupInfoManager.META_FAMILY_BYTES, RSGroupInfoManager.META_QUALIFIER_BYTES));
149      groupMap.put(proto.getName(), RSGroupProtobufUtil.toGroupInfo(proto));
150    }
151    Assert.assertEquals(Sets.newHashSet(groupMap.values()),
152      Sets.newHashSet(wrapped.listRSGroups()));
153    try {
154      String groupBasePath = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode, "rsgroup");
155      for (String znode : ZKUtil.listChildrenNoWatch(zkw, groupBasePath)) {
156        byte[] data = ZKUtil.getData(zkw, ZNodePaths.joinZNode(groupBasePath, znode));
157        if (data.length > 0) {
158          ProtobufUtil.expectPBMagicPrefix(data);
159          ByteArrayInputStream bis =
160            new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length);
161          zList.add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
162        }
163      }
164      Assert.assertEquals(zList.size(), groupMap.size());
165      for (RSGroupInfo RSGroupInfo : zList) {
166        Assert.assertTrue(groupMap.get(RSGroupInfo.getName()).equals(RSGroupInfo));
167      }
168    } catch (KeeperException e) {
169      throw new IOException("ZK verification failed", e);
170    } catch (DeserializationException e) {
171      throw new IOException("ZK verification failed", e);
172    } catch (InterruptedException e) {
173      throw new IOException("ZK verification failed", e);
174    }
175  }
176}