001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows; 021import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES; 022import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_ROWS_IN_BATCH; 023import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertTrue; 026 027import java.util.ArrayList; 028import java.util.List; 029import java.util.stream.Collectors; 030import org.apache.hadoop.fs.FileAlreadyExistsException; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.client.Delete; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; 039import org.apache.hadoop.hbase.testclassification.LargeTests; 040import org.apache.hadoop.hbase.testclassification.ReplicationTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.CommonFSUtils; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049@Category({ ReplicationTests.class, LargeTests.class }) 050public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase { 051 052 @ClassRule 053 public static final HBaseClassTestRule CLASS_RULE = 054 HBaseClassTestRule.forClass(TestReplicationSyncUpTool.class); 055 056 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSyncUpTool.class); 057 058 /** 059 * Add a row to a table in each cluster, check it's replicated, delete it, check's gone Also check 060 * the puts and deletes are not replicated back to the originating cluster. 061 */ 062 @Test 063 public void testSyncUpTool() throws Exception { 064 // Set up Replication: on Master and one Slave 065 // Table: t1_syncup and t2_syncup 066 // columnfamily: 067 // 'cf1' : replicated 068 // 'norep': not replicated 069 setupReplication(); 070 071 // 072 // at Master: 073 // t1_syncup: put 100 rows into cf1, and 1 rows into norep 074 // t2_syncup: put 200 rows into cf1, and 1 rows into norep 075 // 076 // verify correctly replicated to slave 077 putAndReplicateRows(); 078 079 // Verify delete works 080 // 081 // step 1: stop hbase on Slave 082 // 083 // step 2: at Master: 084 // t1_syncup: delete 50 rows from cf1 085 // t2_syncup: delete 100 rows from cf1 086 // no change on 'norep' 087 // 088 // step 3: stop hbase on master, restart hbase on Slave 089 // 090 // step 4: verify Slave still have the rows before delete 091 // t1_syncup: 100 rows from cf1 092 // t2_syncup: 200 rows from cf1 093 // 094 // step 5: run syncup tool on Master 095 // 096 // step 6: verify that delete show up on Slave 097 // t1_syncup: 50 rows from cf1 098 // t2_syncup: 100 rows from cf1 099 // 100 // verify correctly replicated to Slave 101 mimicSyncUpAfterDelete(); 102 103 // Verify put works 104 // 105 // step 1: stop hbase on Slave 106 // 107 // step 2: at Master: 108 // t1_syncup: put 100 rows from cf1 109 // t2_syncup: put 200 rows from cf1 110 // and put another row on 'norep' 111 // ATTN: 112 // put to 'cf1' will overwrite existing rows, so end count will be 100 and 200 respectively 113 // put to 'norep' will add a new row. 114 // 115 // step 3: stop hbase on master, restart hbase on Slave 116 // 117 // step 4: verify Slave still has the rows before put 118 // t1_syncup: 50 rows from cf1 119 // t2_syncup: 100 rows from cf1 120 // 121 // step 5: run syncup tool on Master 122 // 123 // step 6: verify that put show up on Slave and 'norep' does not 124 // t1_syncup: 100 rows from cf1 125 // t2_syncup: 200 rows from cf1 126 // 127 // verify correctly replicated to Slave 128 mimicSyncUpAfterPut(); 129 } 130 131 private void putAndReplicateRows() throws Exception { 132 LOG.debug("putAndReplicateRows"); 133 // add rows to Master cluster, 134 Put p; 135 136 // 100 + 1 row to t1_syncup 137 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 138 p = new Put(Bytes.toBytes("row" + i)); 139 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); 140 ht1Source.put(p); 141 } 142 p = new Put(Bytes.toBytes("row" + 9999)); 143 p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999)); 144 ht1Source.put(p); 145 146 // 200 + 1 row to t2_syncup 147 for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) { 148 p = new Put(Bytes.toBytes("row" + i)); 149 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); 150 ht2Source.put(p); 151 } 152 p = new Put(Bytes.toBytes("row" + 9999)); 153 p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999)); 154 ht2Source.put(p); 155 156 // ensure replication completed 157 Thread.sleep(SLEEP_TIME); 158 int rowCountHt1Source = countRows(ht1Source); 159 for (int i = 0; i < NB_RETRIES; i++) { 160 int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 161 if (i == NB_RETRIES - 1) { 162 assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCountHt1Source - 1, 163 rowCountHt1TargetAtPeer1); 164 } 165 if (rowCountHt1Source - 1 == rowCountHt1TargetAtPeer1) { 166 break; 167 } 168 Thread.sleep(SLEEP_TIME); 169 } 170 171 int rowCountHt2Source = countRows(ht2Source); 172 for (int i = 0; i < NB_RETRIES; i++) { 173 int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 174 if (i == NB_RETRIES - 1) { 175 assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCountHt2Source - 1, 176 rowCountHt2TargetAtPeer1); 177 } 178 if (rowCountHt2Source - 1 == rowCountHt2TargetAtPeer1) { 179 break; 180 } 181 Thread.sleep(SLEEP_TIME); 182 } 183 } 184 185 private void mimicSyncUpAfterDelete() throws Exception { 186 LOG.debug("mimicSyncUpAfterDelete"); 187 shutDownTargetHBaseCluster(); 188 189 List<Delete> list = new ArrayList<>(); 190 // delete half of the rows 191 for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) { 192 String rowKey = "row" + i; 193 Delete del = new Delete(Bytes.toBytes(rowKey)); 194 list.add(del); 195 } 196 ht1Source.delete(list); 197 198 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 199 String rowKey = "row" + i; 200 Delete del = new Delete(Bytes.toBytes(rowKey)); 201 list.add(del); 202 } 203 ht2Source.delete(list); 204 205 int rowCount_ht1Source = countRows(ht1Source); 206 assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51, 207 rowCount_ht1Source); 208 209 int rowCount_ht2Source = countRows(ht2Source); 210 assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101, 211 rowCount_ht2Source); 212 List<ServerName> sourceRses = UTIL1.getHBaseCluster().getRegionServerThreads().stream() 213 .map(rst -> rst.getRegionServer().getServerName()).collect(Collectors.toList()); 214 shutDownSourceHBaseCluster(); 215 restartTargetHBaseCluster(1); 216 217 Thread.sleep(SLEEP_TIME); 218 219 // before sync up 220 int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 221 int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 222 assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1); 223 assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1); 224 225 syncUp(UTIL1); 226 227 // After sync up 228 rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 229 rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 230 assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50, 231 rowCountHt1TargetAtPeer1); 232 assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100, 233 rowCountHt2TargetAtPeer1); 234 235 // check we have recorded the dead region servers and also have an info file 236 Path rootDir = CommonFSUtils.getRootDir(UTIL1.getConfiguration()); 237 Path syncUpInfoDir = new Path(rootDir, ReplicationSyncUp.INFO_DIR); 238 FileSystem fs = UTIL1.getTestFileSystem(); 239 for (ServerName sn : sourceRses) { 240 assertTrue(fs.exists(new Path(syncUpInfoDir, sn.getServerName()))); 241 } 242 assertTrue(fs.exists(new Path(syncUpInfoDir, ReplicationSyncUp.INFO_FILE))); 243 assertEquals(sourceRses.size() + 1, fs.listStatus(syncUpInfoDir).length); 244 245 restartSourceHBaseCluster(1); 246 // should finally removed all the records after restart 247 UTIL1.waitFor(60000, () -> fs.listStatus(syncUpInfoDir).length == 0); 248 } 249 250 private void mimicSyncUpAfterPut() throws Exception { 251 LOG.debug("mimicSyncUpAfterPut"); 252 shutDownTargetHBaseCluster(); 253 254 Put p; 255 // another 100 + 1 row to t1_syncup 256 // we should see 100 + 2 rows now 257 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 258 p = new Put(Bytes.toBytes("row" + i)); 259 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); 260 ht1Source.put(p); 261 } 262 p = new Put(Bytes.toBytes("row" + 9998)); 263 p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998)); 264 ht1Source.put(p); 265 266 // another 200 + 1 row to t1_syncup 267 // we should see 200 + 2 rows now 268 for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) { 269 p = new Put(Bytes.toBytes("row" + i)); 270 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); 271 ht2Source.put(p); 272 } 273 p = new Put(Bytes.toBytes("row" + 9998)); 274 p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998)); 275 ht2Source.put(p); 276 277 int rowCount_ht1Source = countRows(ht1Source); 278 assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source); 279 int rowCount_ht2Source = countRows(ht2Source); 280 assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source); 281 282 shutDownSourceHBaseCluster(); 283 restartTargetHBaseCluster(1); 284 285 Thread.sleep(SLEEP_TIME); 286 287 // before sync up 288 int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 289 int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 290 assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50, 291 rowCountHt1TargetAtPeer1); 292 assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100, 293 rowCountHt2TargetAtPeer1); 294 295 syncUp(UTIL1); 296 297 // after sync up 298 rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 299 rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 300 assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100, 301 rowCountHt1TargetAtPeer1); 302 assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200, 303 rowCountHt2TargetAtPeer1); 304 } 305 306 /** 307 * test "start a new ReplicationSyncUp after the previous failed". See HBASE-27623 for details. 308 */ 309 @Test 310 public void testStartANewSyncUpToolAfterFailed() throws Exception { 311 // Start syncUpTool for the first time with non-force mode, 312 // let's assume that this will fail in sync data, 313 // this does not affect our test results 314 syncUp(UTIL1); 315 Path rootDir = CommonFSUtils.getRootDir(UTIL1.getConfiguration()); 316 Path syncUpInfoDir = new Path(rootDir, ReplicationSyncUp.INFO_DIR); 317 Path replicationInfoPath = new Path(syncUpInfoDir, ReplicationSyncUp.INFO_FILE); 318 FileSystem fs = UTIL1.getTestFileSystem(); 319 assertTrue(fs.exists(replicationInfoPath)); 320 FileStatus fileStatus1 = fs.getFileStatus(replicationInfoPath); 321 322 // Start syncUpTool for the second time with non-force mode, 323 // startup will fail because replication info file already exists 324 try { 325 syncUp(UTIL1); 326 } catch (Exception e) { 327 assertTrue("e should be a FileAlreadyExistsException", 328 (e instanceof FileAlreadyExistsException)); 329 } 330 FileStatus fileStatus2 = fs.getFileStatus(replicationInfoPath); 331 assertEquals(fileStatus1.getModificationTime(), fileStatus2.getModificationTime()); 332 333 // Start syncUpTool for the third time with force mode, 334 // startup will success and create a new replication info file 335 syncUp(UTIL1, new String[] { "-f" }); 336 FileStatus fileStatus3 = fs.getFileStatus(replicationInfoPath); 337 assertTrue(fileStatus3.getModificationTime() > fileStatus2.getModificationTime()); 338 } 339}