001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL; 021 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.HBaseTestingUtil; 024import org.apache.hadoop.hbase.TableName; 025import org.apache.hadoop.hbase.client.Admin; 026import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 027import org.apache.hadoop.hbase.client.Connection; 028import org.apache.hadoop.hbase.client.ConnectionFactory; 029import org.apache.hadoop.hbase.client.Table; 030import org.apache.hadoop.hbase.client.TableDescriptor; 031import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 032import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.util.ToolRunner; 035import org.junit.After; 036import org.junit.Before; 037 038import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 039 040public abstract class TestReplicationSyncUpToolBase { 041 042 protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil(); 043 protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil(); 044 045 protected static final TableName TN1 = TableName.valueOf("t1_syncup"); 046 protected static final TableName TN2 = TableName.valueOf("t2_syncup"); 047 048 protected static final byte[] FAMILY = Bytes.toBytes("cf1"); 049 protected static final byte[] QUALIFIER = Bytes.toBytes("q1"); 050 051 protected static final byte[] NO_REP_FAMILY = Bytes.toBytes("norep"); 052 053 protected TableDescriptor t1SyncupSource; 054 protected TableDescriptor t1SyncupTarget; 055 protected TableDescriptor t2SyncupSource; 056 protected TableDescriptor t2SyncupTarget; 057 058 protected Connection conn1; 059 protected Connection conn2; 060 061 protected Table ht1Source; 062 protected Table ht2Source; 063 protected Table ht1TargetAtPeer1; 064 protected Table ht2TargetAtPeer1; 065 066 protected void customizeClusterConf(Configuration conf) { 067 } 068 069 @Before 070 public void setUp() throws Exception { 071 customizeClusterConf(UTIL1.getConfiguration()); 072 customizeClusterConf(UTIL2.getConfiguration()); 073 TestReplicationBase.configureClusters(UTIL1, UTIL2); 074 UTIL1.startMiniZKCluster(); 075 UTIL2.setZkCluster(UTIL1.getZkCluster()); 076 077 UTIL1.startMiniCluster(2); 078 UTIL2.startMiniCluster(4); 079 080 t1SyncupSource = TableDescriptorBuilder.newBuilder(TN1) 081 .setColumnFamily( 082 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build()) 083 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build(); 084 085 t1SyncupTarget = TableDescriptorBuilder.newBuilder(TN1) 086 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 087 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build(); 088 089 t2SyncupSource = TableDescriptorBuilder.newBuilder(TN2) 090 .setColumnFamily( 091 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build()) 092 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build(); 093 094 t2SyncupTarget = TableDescriptorBuilder.newBuilder(TN2) 095 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 096 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build(); 097 } 098 099 @After 100 public void tearDown() throws Exception { 101 Closeables.close(ht1Source, true); 102 Closeables.close(ht2Source, true); 103 Closeables.close(ht1TargetAtPeer1, true); 104 Closeables.close(ht2TargetAtPeer1, true); 105 Closeables.close(conn1, true); 106 Closeables.close(conn2, true); 107 UTIL2.shutdownMiniCluster(); 108 UTIL1.shutdownMiniCluster(); 109 } 110 111 final void setupReplication() throws Exception { 112 Admin admin1 = UTIL1.getAdmin(); 113 admin1.createTable(t1SyncupSource); 114 admin1.createTable(t2SyncupSource); 115 116 Admin admin2 = UTIL2.getAdmin(); 117 admin2.createTable(t1SyncupTarget); 118 admin2.createTable(t2SyncupTarget); 119 120 // Get HTable from Master 121 conn1 = ConnectionFactory.createConnection(UTIL1.getConfiguration()); 122 ht1Source = conn1.getTable(TN1); 123 ht2Source = conn1.getTable(TN2); 124 125 // Get HTable from Peer1 126 conn2 = ConnectionFactory.createConnection(UTIL2.getConfiguration()); 127 ht1TargetAtPeer1 = conn2.getTable(TN1); 128 ht2TargetAtPeer1 = conn2.getTable(TN2); 129 130 /** 131 * set M-S : Master: utility1 Slave1: utility2 132 */ 133 ReplicationPeerConfig rpc = 134 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getZkConnectionURI()).build(); 135 admin1.addReplicationPeer("1", rpc); 136 } 137 138 final void syncUp(HBaseTestingUtil util) throws Exception { 139 syncUp(util, new String[0]); 140 } 141 142 final void syncUp(HBaseTestingUtil util, String[] args) throws Exception { 143 ToolRunner.run(new Configuration(util.getConfiguration()), new ReplicationSyncUp(), args); 144 } 145 146 // Utilities that manager shutdown / restart of source / sink clusters. They take care of 147 // invalidating stale connections after shutdown / restarts. 148 final void shutDownSourceHBaseCluster() throws Exception { 149 Closeables.close(ht1Source, true); 150 Closeables.close(ht2Source, true); 151 UTIL1.shutdownMiniHBaseCluster(); 152 } 153 154 final void shutDownTargetHBaseCluster() throws Exception { 155 Closeables.close(ht1TargetAtPeer1, true); 156 Closeables.close(ht2TargetAtPeer1, true); 157 UTIL2.shutdownMiniHBaseCluster(); 158 } 159 160 final void restartSourceHBaseCluster(int numServers) throws Exception { 161 Closeables.close(ht1Source, true); 162 Closeables.close(ht2Source, true); 163 UTIL1.restartHBaseCluster(numServers); 164 ht1Source = UTIL1.getConnection().getTable(TN1); 165 ht2Source = UTIL1.getConnection().getTable(TN2); 166 } 167 168 final void restartTargetHBaseCluster(int numServers) throws Exception { 169 Closeables.close(ht1TargetAtPeer1, true); 170 Closeables.close(ht2TargetAtPeer1, true); 171 UTIL2.restartHBaseCluster(numServers); 172 ht1TargetAtPeer1 = UTIL2.getConnection().getTable(TN1); 173 ht2TargetAtPeer1 = UTIL2.getConnection().getTable(TN2); 174 } 175}