001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES; 021import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_ROWS_IN_BATCH; 022import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME; 023import static org.junit.Assert.assertEquals; 024 025import java.util.ArrayList; 026import java.util.List; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.client.Delete; 029import org.apache.hadoop.hbase.client.Put; 030import org.apache.hadoop.hbase.testclassification.LargeTests; 031import org.apache.hadoop.hbase.testclassification.ReplicationTests; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.junit.ClassRule; 034import org.junit.Test; 035import org.junit.experimental.categories.Category; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039@Category({ ReplicationTests.class, LargeTests.class }) 040public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase { 041 042 @ClassRule 043 public static final HBaseClassTestRule CLASS_RULE = 044 HBaseClassTestRule.forClass(TestReplicationSyncUpTool.class); 045 046 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSyncUpTool.class); 047 048 /** 049 * Add a row to a table in each cluster, check it's replicated, delete it, check's gone Also check 050 * the puts and deletes are not replicated back to the originating cluster. 051 */ 052 @Test 053 public void testSyncUpTool() throws Exception { 054 055 /** 056 * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily: 057 * 'cf1' : replicated 'norep': not replicated 058 */ 059 setupReplication(); 060 061 /** 062 * at Master: t1_syncup: put 100 rows into cf1, and 1 rows into norep t2_syncup: put 200 rows 063 * into cf1, and 1 rows into norep verify correctly replicated to slave 064 */ 065 putAndReplicateRows(); 066 067 /** 068 * Verify delete works step 1: stop hbase on Slave step 2: at Master: t1_syncup: delete 50 rows 069 * from cf1 t2_syncup: delete 100 rows from cf1 no change on 'norep' step 3: stop hbase on 070 * master, restart hbase on Slave step 4: verify Slave still have the rows before delete 071 * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step 5: run syncup tool on Master 072 * step 6: verify that delete show up on Slave t1_syncup: 50 rows from cf1 t2_syncup: 100 rows 073 * from cf1 verify correctly replicated to Slave 074 */ 075 mimicSyncUpAfterDelete(); 076 077 /** 078 * Verify put works step 1: stop hbase on Slave step 2: at Master: t1_syncup: put 100 rows from 079 * cf1 t2_syncup: put 200 rows from cf1 and put another row on 'norep' ATTN: put to 'cf1' will 080 * overwrite existing rows, so end count will be 100 and 200 respectively put to 'norep' will 081 * add a new row. step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave 082 * still has the rows before put t1_syncup: 50 rows from cf1 t2_syncup: 100 rows from cf1 step 083 * 5: run syncup tool on Master step 6: verify that put show up on Slave and 'norep' does not 084 * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 verify correctly replicated to 085 * Slave 086 */ 087 mimicSyncUpAfterPut(); 088 } 089 090 private void putAndReplicateRows() throws Exception { 091 LOG.debug("putAndReplicateRows"); 092 // add rows to Master cluster, 093 Put p; 094 095 // 100 + 1 row to t1_syncup 096 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 097 p = new Put(Bytes.toBytes("row" + i)); 098 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); 099 ht1Source.put(p); 100 } 101 p = new Put(Bytes.toBytes("row" + 9999)); 102 p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999)); 103 ht1Source.put(p); 104 105 // 200 + 1 row to t2_syncup 106 for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) { 107 p = new Put(Bytes.toBytes("row" + i)); 108 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); 109 ht2Source.put(p); 110 } 111 p = new Put(Bytes.toBytes("row" + 9999)); 112 p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999)); 113 ht2Source.put(p); 114 115 // ensure replication completed 116 Thread.sleep(SLEEP_TIME); 117 int rowCountHt1Source = UTIL1.countRows(ht1Source); 118 for (int i = 0; i < NB_RETRIES; i++) { 119 int rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1); 120 if (i == NB_RETRIES - 1) { 121 assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCountHt1Source - 1, 122 rowCountHt1TargetAtPeer1); 123 } 124 if (rowCountHt1Source - 1 == rowCountHt1TargetAtPeer1) { 125 break; 126 } 127 Thread.sleep(SLEEP_TIME); 128 } 129 130 int rowCountHt2Source = UTIL1.countRows(ht2Source); 131 for (int i = 0; i < NB_RETRIES; i++) { 132 int rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1); 133 if (i == NB_RETRIES - 1) { 134 assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCountHt2Source - 1, 135 rowCountHt2TargetAtPeer1); 136 } 137 if (rowCountHt2Source - 1 == rowCountHt2TargetAtPeer1) { 138 break; 139 } 140 Thread.sleep(SLEEP_TIME); 141 } 142 } 143 144 private void mimicSyncUpAfterDelete() throws Exception { 145 LOG.debug("mimicSyncUpAfterDelete"); 146 shutDownTargetHBaseCluster(); 147 148 List<Delete> list = new ArrayList<>(); 149 // delete half of the rows 150 for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) { 151 String rowKey = "row" + i; 152 Delete del = new Delete(Bytes.toBytes(rowKey)); 153 list.add(del); 154 } 155 ht1Source.delete(list); 156 157 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 158 String rowKey = "row" + i; 159 Delete del = new Delete(Bytes.toBytes(rowKey)); 160 list.add(del); 161 } 162 ht2Source.delete(list); 163 164 int rowCount_ht1Source = UTIL1.countRows(ht1Source); 165 assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51, 166 rowCount_ht1Source); 167 168 int rowCount_ht2Source = UTIL1.countRows(ht2Source); 169 assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101, 170 rowCount_ht2Source); 171 172 shutDownSourceHBaseCluster(); 173 restartTargetHBaseCluster(1); 174 175 Thread.sleep(SLEEP_TIME); 176 177 // before sync up 178 int rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1); 179 int rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1); 180 assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1); 181 assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1); 182 183 // After sync up 184 for (int i = 0; i < NB_RETRIES; i++) { 185 syncUp(UTIL1); 186 rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1); 187 rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1); 188 if (i == NB_RETRIES - 1) { 189 if (rowCountHt1TargetAtPeer1 != 50 || rowCountHt2TargetAtPeer1 != 100) { 190 // syncUP still failed. Let's look at the source in case anything wrong there 191 restartSourceHBaseCluster(1); 192 rowCount_ht1Source = UTIL1.countRows(ht1Source); 193 LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source); 194 rowCount_ht2Source = UTIL1.countRows(ht2Source); 195 LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source); 196 } 197 assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50, 198 rowCountHt1TargetAtPeer1); 199 assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100, 200 rowCountHt2TargetAtPeer1); 201 } 202 if (rowCountHt1TargetAtPeer1 == 50 && rowCountHt2TargetAtPeer1 == 100) { 203 LOG.info("SyncUpAfterDelete succeeded at retry = " + i); 204 break; 205 } else { 206 LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" 207 + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" 208 + rowCountHt2TargetAtPeer1); 209 } 210 Thread.sleep(SLEEP_TIME); 211 } 212 } 213 214 private void mimicSyncUpAfterPut() throws Exception { 215 LOG.debug("mimicSyncUpAfterPut"); 216 restartSourceHBaseCluster(1); 217 shutDownTargetHBaseCluster(); 218 219 Put p; 220 // another 100 + 1 row to t1_syncup 221 // we should see 100 + 2 rows now 222 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 223 p = new Put(Bytes.toBytes("row" + i)); 224 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); 225 ht1Source.put(p); 226 } 227 p = new Put(Bytes.toBytes("row" + 9998)); 228 p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998)); 229 ht1Source.put(p); 230 231 // another 200 + 1 row to t1_syncup 232 // we should see 200 + 2 rows now 233 for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) { 234 p = new Put(Bytes.toBytes("row" + i)); 235 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); 236 ht2Source.put(p); 237 } 238 p = new Put(Bytes.toBytes("row" + 9998)); 239 p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998)); 240 ht2Source.put(p); 241 242 int rowCount_ht1Source = UTIL1.countRows(ht1Source); 243 assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source); 244 int rowCount_ht2Source = UTIL1.countRows(ht2Source); 245 assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source); 246 247 shutDownSourceHBaseCluster(); 248 restartTargetHBaseCluster(1); 249 250 Thread.sleep(SLEEP_TIME); 251 252 // before sync up 253 int rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1); 254 int rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1); 255 assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50, 256 rowCountHt1TargetAtPeer1); 257 assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100, 258 rowCountHt2TargetAtPeer1); 259 260 // after syun up 261 for (int i = 0; i < NB_RETRIES; i++) { 262 syncUp(UTIL1); 263 rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1); 264 rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1); 265 if (i == NB_RETRIES - 1) { 266 if (rowCountHt1TargetAtPeer1 != 100 || rowCountHt2TargetAtPeer1 != 200) { 267 // syncUP still failed. Let's look at the source in case anything wrong there 268 restartSourceHBaseCluster(1); 269 rowCount_ht1Source = UTIL1.countRows(ht1Source); 270 LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source); 271 rowCount_ht2Source = UTIL1.countRows(ht2Source); 272 LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source); 273 } 274 assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100, 275 rowCountHt1TargetAtPeer1); 276 assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200, 277 rowCountHt2TargetAtPeer1); 278 } 279 if (rowCountHt1TargetAtPeer1 == 100 && rowCountHt2TargetAtPeer1 == 200) { 280 LOG.info("SyncUpAfterPut succeeded at retry = " + i); 281 break; 282 } else { 283 LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" 284 + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" 285 + rowCountHt2TargetAtPeer1); 286 } 287 Thread.sleep(SLEEP_TIME); 288 } 289 } 290}