001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.conf.Configured;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.hbase.Abortable;
025import org.apache.hadoop.hbase.HBaseConfiguration;
026import org.apache.hadoop.hbase.HBaseInterfaceAudience;
027import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
028import org.apache.hadoop.util.Tool;
029import org.apache.hadoop.util.ToolRunner;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * A tool for copying replication peer data across different replication peer storages.
036 * <p/>
037 * Notice that we will not delete the replication peer data from the source storage, as this tool
038 * can also be used by online migration. See HBASE-27110 for the whole design.
039 */
040@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
041public class CopyReplicationPeers extends Configured implements Tool {
042
043  private static final Logger LOG = LoggerFactory.getLogger(CopyReplicationPeers.class);
044
045  public static final String NAME = "copyreppeers";
046
047  public CopyReplicationPeers(Configuration conf) {
048    super(conf);
049  }
050
051  private ReplicationPeerStorage create(String type, FileSystem fs, ZKWatcher zk) {
052    Configuration conf = new Configuration(getConf());
053    conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, type);
054    return ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
055  }
056
057  private ZKWatcher createZKWatcher() throws IOException {
058    return new ZKWatcher(getConf(), getClass().getSimpleName(), new Abortable() {
059
060      private volatile boolean aborted;
061
062      @Override
063      public boolean isAborted() {
064        return aborted;
065      }
066
067      @Override
068      public void abort(String why, Throwable e) {
069        aborted = true;
070        LOG.error(why, e);
071        System.exit(1);
072      }
073    });
074  }
075
076  private void migrate(ReplicationPeerStorage src, ReplicationPeerStorage dst)
077    throws ReplicationException {
078    LOG.info("Start migrating from {} to {}", src.getClass().getSimpleName(),
079      dst.getClass().getSimpleName());
080    for (String peerId : src.listPeerIds()) {
081      LOG.info("Going to migrate {}", peerId);
082      ReplicationPeerConfig peerConfig = src.getPeerConfig(peerId);
083      boolean enabled = src.isPeerEnabled(peerId);
084      dst.addPeer(peerId, peerConfig, enabled);
085      LOG.info("Migrated peer {}, peerConfig = '{}', enabled = {}", peerId, peerConfig, enabled);
086    }
087  }
088
089  @Override
090  public int run(String[] args) throws Exception {
091    if (args.length != 2) {
092      System.err.println("Usage: bin/hbase " + NAME
093        + " <SRC_REPLICATION_PEER_STORAGE> <DST_REPLICATION_PEER_STORAGE>");
094      System.err.println("The possible values for replication storage type:");
095      for (ReplicationPeerStorageType type : ReplicationPeerStorageType.values()) {
096        System.err.println("  " + type.name().toLowerCase());
097      }
098      return -1;
099    }
100    FileSystem fs = FileSystem.get(getConf());
101    try (ZKWatcher zk = createZKWatcher()) {
102      ReplicationPeerStorage src = create(args[0], fs, zk);
103      ReplicationPeerStorage dst = create(args[1], fs, zk);
104      migrate(src, dst);
105    }
106    return 0;
107  }
108
109  public static void main(String[] args) throws Exception {
110    Configuration conf = HBaseConfiguration.create();
111    int ret = ToolRunner.run(conf, new CopyReplicationPeers(conf), args);
112    System.exit(ret);
113  }
114}