001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.master;
019
020import static org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage.PEERS_ZNODE;
021import static org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage.PEERS_ZNODE_DEFAULT;
022import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.REPLICATION_ZNODE;
023import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT;
024
025import java.io.IOException;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.hbase.HBaseConfiguration;
029import org.apache.hadoop.hbase.client.Admin;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.ConnectionFactory;
032import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
033import org.apache.hadoop.hbase.replication.ReplicationException;
034import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
035import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
036import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
037import org.apache.hadoop.hbase.zookeeper.ZKUtil;
038import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
039import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.apache.yetus.audience.InterfaceStability;
042import org.apache.zookeeper.KeeperException;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
047
048/**
049 * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x. It will
050 * be removed in HBase 3.x. See HBASE-11393
051 */
052@InterfaceAudience.Private
053@InterfaceStability.Unstable
054public class ReplicationPeerConfigUpgrader {
055
056  private static final String TABLE_CFS_ZNODE = "zookeeper.znode.replication.peers.tableCFs";
057  private static final String TABLE_CFS_ZNODE_DEFAULT = "tableCFs";
058
059  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUpgrader.class);
060  private final Configuration conf;
061  private final ZKWatcher zookeeper;
062  private final ReplicationPeerStorage peerStorage;
063
064  public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper, Configuration conf) throws IOException {
065    this.zookeeper = zookeeper;
066    this.conf = conf;
067    this.peerStorage =
068      ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), zookeeper, conf);
069  }
070
071  public void upgrade() throws Exception {
072    try (Connection conn = ConnectionFactory.createConnection(conf)) {
073      Admin admin = conn.getAdmin();
074      admin.listReplicationPeers().forEach((peerDesc) -> {
075        String peerId = peerDesc.getPeerId();
076        ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig();
077        if (
078          (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
079            || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())
080        ) {
081          peerConfig.setReplicateAllUserTables(false);
082          try {
083            admin.updateReplicationPeerConfig(peerId, peerConfig);
084          } catch (Exception e) {
085            LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e);
086          }
087        }
088      });
089    }
090  }
091
092  public void copyTableCFs() throws ReplicationException {
093    for (String peerId : peerStorage.listPeerIds()) {
094      if (!copyTableCFs(peerId)) {
095        LOG.error("upgrade tableCFs failed for peerId=" + peerId);
096      }
097    }
098  }
099
100  protected String getTableCFsNode(String peerId) {
101    String replicationZNode = ZNodePaths.joinZNode(zookeeper.getZNodePaths().baseZNode,
102      conf.get(REPLICATION_ZNODE, REPLICATION_ZNODE_DEFAULT));
103    String peersZNode =
104      ZNodePaths.joinZNode(replicationZNode, conf.get(PEERS_ZNODE, PEERS_ZNODE_DEFAULT));
105    return ZNodePaths.joinZNode(peersZNode,
106      ZNodePaths.joinZNode(peerId, conf.get(TABLE_CFS_ZNODE, TABLE_CFS_ZNODE_DEFAULT)));
107  }
108
109  public boolean copyTableCFs(String peerId) throws ReplicationException {
110    String tableCFsNode = getTableCFsNode(peerId);
111    try {
112      if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) {
113        ReplicationPeerConfig rpc = peerStorage.getPeerConfig(peerId);
114        // We only need to copy data from tableCFs node to rpc Node the first time hmaster start.
115        if (rpc.getTableCFsMap() == null || rpc.getTableCFsMap().isEmpty()) {
116          // we copy TableCFs node into PeerNode
117          LOG.info("Copy table ColumnFamilies into peer=" + peerId);
118          ReplicationProtos.TableCF[] tableCFs =
119            ReplicationPeerConfigUtil.parseTableCFs(ZKUtil.getData(this.zookeeper, tableCFsNode));
120          if (tableCFs != null && tableCFs.length > 0) {
121            rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs));
122            peerStorage.updatePeerConfig(peerId, rpc);
123          }
124        } else {
125          LOG.info("No tableCFs in peerNode:" + peerId);
126        }
127      }
128    } catch (KeeperException e) {
129      LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
130      return false;
131    } catch (InterruptedException e) {
132      LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
133      return false;
134    } catch (IOException e) {
135      LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
136      return false;
137    }
138    return true;
139  }
140
141  private static void printUsageAndExit() {
142    System.err.printf(
143      "Usage: hbase org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader"
144        + " [options]");
145    System.err.println(" where [options] are:");
146    System.err.println("  -h|-help      Show this help and exit.");
147    System.err.println("  copyTableCFs  Copy table-cfs to replication peer config");
148    System.err.println("  upgrade           Upgrade replication peer config to new format");
149    System.err.println();
150    System.exit(1);
151  }
152
153  public static void main(String[] args) throws Exception {
154    if (args.length != 1) {
155      printUsageAndExit();
156    }
157    if (args[0].equals("-help") || args[0].equals("-h")) {
158      printUsageAndExit();
159    } else if (args[0].equals("copyTableCFs")) {
160      Configuration conf = HBaseConfiguration.create();
161      try (ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null)) {
162        ReplicationPeerConfigUpgrader tableCFsUpdater =
163          new ReplicationPeerConfigUpgrader(zkw, conf);
164        tableCFsUpdater.copyTableCFs();
165      }
166    } else if (args[0].equals("upgrade")) {
167      Configuration conf = HBaseConfiguration.create();
168      try (ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null)) {
169        ReplicationPeerConfigUpgrader upgrader = new ReplicationPeerConfigUpgrader(zkw, conf);
170        upgrader.upgrade();
171      }
172    } else {
173      printUsageAndExit();
174    }
175  }
176}