001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.util.Arrays; 021import java.util.Collections; 022import java.util.List; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 025import org.apache.hadoop.hbase.exceptions.DeserializationException; 026import org.apache.hadoop.hbase.zookeeper.ZKUtil; 027import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; 028import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 029import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.apache.zookeeper.KeeperException; 032 033import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 034 035/** 036 * ZK based replication peer storage. 037 */ 038@InterfaceAudience.Private 039public class ZKReplicationPeerStorage extends ZKReplicationStorageBase 040 implements ReplicationPeerStorage { 041 042 public static final String PEERS_ZNODE = "zookeeper.znode.replication.peers"; 043 public static final String PEERS_ZNODE_DEFAULT = "peers"; 044 045 public static final String PEERS_STATE_ZNODE = "zookeeper.znode.replication.peers.state"; 046 public static final String PEERS_STATE_ZNODE_DEFAULT = "peer-state"; 047 048 public static final byte[] ENABLED_ZNODE_BYTES = 049 toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); 050 public static final byte[] DISABLED_ZNODE_BYTES = 051 toByteArray(ReplicationProtos.ReplicationState.State.DISABLED); 052 053 /** 054 * The name of the znode that contains the replication status of a remote slave (i.e. peer) 055 * cluster. 056 */ 057 private final String peerStateNodeName; 058 059 /** 060 * The name of the znode that contains a list of all remote slave (i.e. peer) clusters. 061 */ 062 private final String peersZNode; 063 064 public ZKReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) { 065 super(zookeeper, conf); 066 this.peerStateNodeName = conf.get(PEERS_STATE_ZNODE, PEERS_STATE_ZNODE_DEFAULT); 067 String peersZNodeName = conf.get(PEERS_ZNODE, PEERS_ZNODE_DEFAULT); 068 this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName); 069 } 070 071 public String getPeerStateNode(String peerId) { 072 return ZNodePaths.joinZNode(getPeerNode(peerId), peerStateNodeName); 073 } 074 075 public String getPeerNode(String peerId) { 076 return ZNodePaths.joinZNode(peersZNode, peerId); 077 } 078 079 @Override 080 public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) 081 throws ReplicationException { 082 try { 083 ZKUtil.createWithParents(zookeeper, peersZNode); 084 ZKUtil.multiOrSequential(zookeeper, 085 Arrays.asList( 086 ZKUtilOp.createAndFailSilent(getPeerNode(peerId), 087 ReplicationPeerConfigUtil.toByteArray(peerConfig)), 088 ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId), 089 enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)), 090 false); 091 } catch (KeeperException e) { 092 throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>" 093 + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e); 094 } 095 } 096 097 @Override 098 public void removePeer(String peerId) throws ReplicationException { 099 try { 100 ZKUtil.deleteNodeRecursively(zookeeper, getPeerNode(peerId)); 101 } catch (KeeperException e) { 102 throw new ReplicationException("Could not remove peer with id=" + peerId, e); 103 } 104 } 105 106 @Override 107 public void setPeerState(String peerId, boolean enabled) throws ReplicationException { 108 byte[] stateBytes = enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES; 109 try { 110 ZKUtil.setData(zookeeper, getPeerStateNode(peerId), stateBytes); 111 } catch (KeeperException e) { 112 throw new ReplicationException("Unable to change state of the peer with id=" + peerId, e); 113 } 114 } 115 116 @Override 117 public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 118 throws ReplicationException { 119 try { 120 ZKUtil.setData(this.zookeeper, getPeerNode(peerId), 121 ReplicationPeerConfigUtil.toByteArray(peerConfig)); 122 } catch (KeeperException e) { 123 throw new ReplicationException( 124 "There was a problem trying to save changes to the " + "replication peer " + peerId, e); 125 } 126 } 127 128 @Override 129 public List<String> listPeerIds() throws ReplicationException { 130 try { 131 List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, peersZNode); 132 return children != null ? children : Collections.emptyList(); 133 } catch (KeeperException e) { 134 throw new ReplicationException("Cannot get the list of peers", e); 135 } 136 } 137 138 @Override 139 public boolean isPeerEnabled(String peerId) throws ReplicationException { 140 try { 141 return Arrays.equals(ENABLED_ZNODE_BYTES, 142 ZKUtil.getData(zookeeper, getPeerStateNode(peerId))); 143 } catch (KeeperException | InterruptedException e) { 144 throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e); 145 } 146 } 147 148 @Override 149 public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException { 150 byte[] data; 151 try { 152 data = ZKUtil.getData(zookeeper, getPeerNode(peerId)); 153 } catch (KeeperException | InterruptedException e) { 154 throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e); 155 } 156 if (data == null || data.length == 0) { 157 throw new ReplicationException( 158 "Replication peer config data shouldn't be empty, peerId=" + peerId); 159 } 160 try { 161 return ReplicationPeerConfigUtil.parsePeerFrom(data); 162 } catch (DeserializationException e) { 163 throw new ReplicationException( 164 "Failed to parse replication peer config for peer with id=" + peerId, e); 165 } 166 } 167}