001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.UUID; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.atomic.AtomicBoolean; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.HBaseTestingUtil; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.Admin; 030import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 031import org.apache.hadoop.hbase.client.Put; 032import org.apache.hadoop.hbase.client.Table; 033import org.apache.hadoop.hbase.client.TableDescriptor; 034import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 035import org.apache.hadoop.hbase.testclassification.MediumTests; 036import org.apache.hadoop.hbase.testclassification.ReplicationTests; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.util.Threads; 039import org.junit.AfterClass; 040import org.junit.Before; 041import org.junit.BeforeClass; 042import org.junit.ClassRule; 043import org.junit.Test; 044import org.junit.experimental.categories.Category; 045 046import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 047 048@Category({ MediumTests.class, ReplicationTests.class }) 049public class TestNonHBaseReplicationEndpoint { 050 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestNonHBaseReplicationEndpoint.class); 054 055 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 056 057 private static Admin ADMIN; 058 059 private static final TableName tableName = TableName.valueOf("test"); 060 private static final byte[] famName = Bytes.toBytes("f"); 061 062 private static final AtomicBoolean REPLICATED = new AtomicBoolean(); 063 064 @BeforeClass 065 public static void setupBeforeClass() throws Exception { 066 UTIL.startMiniCluster(); 067 ADMIN = UTIL.getAdmin(); 068 } 069 070 @AfterClass 071 public static void teardownAfterClass() throws Exception { 072 UTIL.shutdownMiniCluster(); 073 } 074 075 @Before 076 public void setup() { 077 REPLICATED.set(false); 078 } 079 080 @Test 081 public void test() throws IOException { 082 TableDescriptor td = 083 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder 084 .newBuilder(famName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); 085 Table table = UTIL.createTable(td, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 086 087 ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() 088 .setReplicationEndpointImpl(NonHBaseReplicationEndpoint.class.getName()) 089 .setReplicateAllUserTables(false) 090 .setTableCFsMap(ImmutableMap.of(tableName, new ArrayList<>())).build(); 091 092 ADMIN.addReplicationPeer("1", peerConfig); 093 loadData(table); 094 095 UTIL.waitFor(10000L, () -> REPLICATED.get()); 096 } 097 098 protected static void loadData(Table table) throws IOException { 099 for (int i = 0; i < 100; i++) { 100 Put put = new Put(Bytes.toBytes(Integer.toString(i))); 101 put.addColumn(famName, famName, Bytes.toBytes(i)); 102 table.put(put); 103 } 104 } 105 106 public static class NonHBaseReplicationEndpoint implements ReplicationEndpoint { 107 108 private boolean running = false; 109 110 @Override 111 public void init(Context context) throws IOException { 112 } 113 114 @Override 115 public boolean canReplicateToSameCluster() { 116 return false; 117 } 118 119 @Override 120 public UUID getPeerUUID() { 121 return UUID.randomUUID(); 122 } 123 124 @Override 125 public WALEntryFilter getWALEntryfilter() { 126 return null; 127 } 128 129 @Override 130 public boolean replicate(ReplicateContext replicateContext) { 131 REPLICATED.set(true); 132 return true; 133 } 134 135 @Override 136 public boolean isRunning() { 137 return running; 138 } 139 140 @Override 141 public boolean isStarting() { 142 return false; 143 } 144 145 @Override 146 public void start() { 147 running = true; 148 } 149 150 @Override 151 public void awaitRunning() { 152 long interval = 100L; 153 while (!running) { 154 Threads.sleep(interval); 155 } 156 } 157 158 @Override 159 public void awaitRunning(long timeout, TimeUnit unit) { 160 long start = System.currentTimeMillis(); 161 long end = start + unit.toMillis(timeout); 162 long interval = 100L; 163 while (!running && System.currentTimeMillis() < end) { 164 Threads.sleep(interval); 165 } 166 } 167 168 @Override 169 public void stop() { 170 running = false; 171 } 172 173 @Override 174 public void awaitTerminated() { 175 long interval = 100L; 176 while (running) { 177 Threads.sleep(interval); 178 } 179 } 180 181 @Override 182 public void awaitTerminated(long timeout, TimeUnit unit) { 183 long start = System.currentTimeMillis(); 184 long end = start + unit.toMillis(timeout); 185 long interval = 100L; 186 while (running && System.currentTimeMillis() < end) { 187 Threads.sleep(interval); 188 } 189 } 190 191 @Override 192 public Throwable failureCause() { 193 return null; 194 } 195 196 @Override 197 public void peerConfigUpdated(ReplicationPeerConfig rpc) { 198 } 199 } 200}