001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.Delete; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.ResultScanner; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.client.TableDescriptor; 043import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 044import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; 045import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 046import org.apache.hadoop.hbase.testclassification.LargeTests; 047import org.apache.hadoop.hbase.testclassification.ReplicationTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.CommonFSUtils; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.junit.AfterClass; 052import org.junit.Before; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 063 064/** 065 * We moved some of {@link TestVerifyReplicationZkClusterKey}'s tests here because it could take too 066 * long to complete. In here we have miscellaneous. 067 */ 068@Category({ ReplicationTests.class, LargeTests.class }) 069public class TestVerifyReplicationAdjunct extends TestReplicationBase { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestVerifyReplicationAdjunct.class); 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplicationAdjunct.class); 076 077 private static final String PEER_ID = "2"; 078 private static final TableName peerTableName = TableName.valueOf("peerTest"); 079 private static Table htable3; 080 081 @Rule 082 public TestName name = new TestName(); 083 084 @Override 085 protected String getClusterKey(HBaseTestingUtil util) throws Exception { 086 // TODO: VerifyReplication does not support connection uri yet, so here we need to use cluster 087 // key, as in this test we will pass the cluster key config in peer config directly to 088 // VerifyReplication job. 089 return util.getClusterKey(); 090 } 091 092 @Before 093 public void setUp() throws Exception { 094 cleanUp(); 095 UTIL2.deleteTableData(peerTableName); 096 } 097 098 @BeforeClass 099 public static void setUpBeforeClass() throws Exception { 100 TestReplicationBase.setUpBeforeClass(); 101 TableDescriptor peerTable = 102 TableDescriptorBuilder.newBuilder(peerTableName) 103 .setColumnFamily( 104 ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100).build()) 105 .build(); 106 Connection connection2 = ConnectionFactory.createConnection(CONF2); 107 try (Admin admin2 = connection2.getAdmin()) { 108 admin2.createTable(peerTable, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 109 } 110 htable3 = connection2.getTable(peerTableName); 111 } 112 113 // VerifyReplication should honor versions option 114 @Test 115 public void testHBase14905() throws Exception { 116 // normal Batch tests 117 byte[] qualifierName = Bytes.toBytes("f1"); 118 Put put = new Put(Bytes.toBytes("r1")); 119 long ts = EnvironmentEdgeManager.currentTime(); 120 put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1002")); 121 htable1.put(put); 122 put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v1001")); 123 htable1.put(put); 124 put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v1112")); 125 htable1.put(put); 126 127 Scan scan = new Scan(); 128 scan.readVersions(100); 129 ResultScanner scanner1 = htable1.getScanner(scan); 130 Result[] res1 = scanner1.next(1); 131 scanner1.close(); 132 133 assertEquals(1, res1.length); 134 assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); 135 136 for (int i = 0; i < NB_RETRIES; i++) { 137 scan = new Scan(); 138 scan.readVersions(100); 139 scanner1 = htable2.getScanner(scan); 140 res1 = scanner1.next(1); 141 scanner1.close(); 142 if (res1.length != 1) { 143 LOG.info("Only got " + res1.length + " rows"); 144 Thread.sleep(SLEEP_TIME); 145 } else { 146 int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size(); 147 if (cellNumber != 3) { 148 LOG.info("Only got " + cellNumber + " cells"); 149 Thread.sleep(SLEEP_TIME); 150 } else { 151 break; 152 } 153 } 154 if (i == NB_RETRIES - 1) { 155 fail("Waited too much time for normal batch replication"); 156 } 157 } 158 159 put.addColumn(famName, qualifierName, ts + 4, Bytes.toBytes("v1111")); 160 htable2.put(put); 161 put.addColumn(famName, qualifierName, ts + 5, Bytes.toBytes("v1112")); 162 htable2.put(put); 163 164 scan = new Scan(); 165 scan.readVersions(100); 166 scanner1 = htable2.getScanner(scan); 167 res1 = scanner1.next(NB_ROWS_IN_BATCH); 168 scanner1.close(); 169 170 assertEquals(1, res1.length); 171 assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size()); 172 173 String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() }; 174 TestVerifyReplicationZkClusterKey.runVerifyReplication(args, 0, 1); 175 } 176 177 // VerifyReplication should honor versions option 178 @Test 179 public void testVersionMismatchHBase14905() throws Exception { 180 // normal Batch tests 181 byte[] qualifierName = Bytes.toBytes("f1"); 182 Put put = new Put(Bytes.toBytes("r1")); 183 long ts = EnvironmentEdgeManager.currentTime(); 184 put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1")); 185 htable1.put(put); 186 put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v2")); 187 htable1.put(put); 188 put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v3")); 189 htable1.put(put); 190 191 Scan scan = new Scan(); 192 scan.readVersions(100); 193 ResultScanner scanner1 = htable1.getScanner(scan); 194 Result[] res1 = scanner1.next(1); 195 scanner1.close(); 196 197 assertEquals(1, res1.length); 198 assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); 199 200 for (int i = 0; i < NB_RETRIES; i++) { 201 scan = new Scan(); 202 scan.readVersions(100); 203 scanner1 = htable2.getScanner(scan); 204 res1 = scanner1.next(1); 205 scanner1.close(); 206 if (res1.length != 1) { 207 LOG.info("Only got " + res1.length + " rows"); 208 Thread.sleep(SLEEP_TIME); 209 } else { 210 int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size(); 211 if (cellNumber != 3) { 212 LOG.info("Only got " + cellNumber + " cells"); 213 Thread.sleep(SLEEP_TIME); 214 } else { 215 break; 216 } 217 } 218 if (i == NB_RETRIES - 1) { 219 fail("Waited too much time for normal batch replication"); 220 } 221 } 222 223 try { 224 // Disabling replication and modifying the particular version of the cell to validate the 225 // feature. 226 hbaseAdmin.disableReplicationPeer(PEER_ID); 227 Put put2 = new Put(Bytes.toBytes("r1")); 228 put2.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v99")); 229 htable2.put(put2); 230 231 scan = new Scan(); 232 scan.readVersions(100); 233 scanner1 = htable2.getScanner(scan); 234 res1 = scanner1.next(NB_ROWS_IN_BATCH); 235 scanner1.close(); 236 assertEquals(1, res1.length); 237 assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); 238 239 String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() }; 240 TestVerifyReplicationZkClusterKey.runVerifyReplication(args, 0, 1); 241 } finally { 242 hbaseAdmin.enableReplicationPeer(PEER_ID); 243 } 244 } 245 246 @Test 247 public void testVerifyReplicationPrefixFiltering() throws Exception { 248 final byte[] prefixRow = Bytes.toBytes("prefixrow"); 249 final byte[] prefixRow2 = Bytes.toBytes("secondrow"); 250 loadData("prefixrow", prefixRow); 251 loadData("secondrow", prefixRow2); 252 loadData("aaa", row); 253 loadData("zzz", row); 254 waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4); 255 String[] args = 256 new String[] { "--row-prefixes=prefixrow,secondrow", PEER_ID, tableName.getNameAsString() }; 257 TestVerifyReplicationZkClusterKey.runVerifyReplication(args, NB_ROWS_IN_BATCH * 2, 0); 258 } 259 260 @Test 261 public void testVerifyReplicationSnapshotArguments() { 262 String[] args = 263 new String[] { "--sourceSnapshotName=snapshot1", "2", tableName.getNameAsString() }; 264 assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 265 266 args = new String[] { "--sourceSnapshotTmpDir=tmp", "2", tableName.getNameAsString() }; 267 assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 268 269 args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=tmp", "2", 270 tableName.getNameAsString() }; 271 assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 272 273 args = new String[] { "--peerSnapshotName=snapshot1", "2", tableName.getNameAsString() }; 274 assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 275 276 args = new String[] { "--peerSnapshotTmpDir=/tmp/", "2", tableName.getNameAsString() }; 277 assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 278 279 args = new String[] { "--peerSnapshotName=snapshot1", "--peerSnapshotTmpDir=/tmp/", 280 "--peerFSAddress=tempfs", "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", 281 tableName.getNameAsString() }; 282 assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 283 284 args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=/tmp/", 285 "--peerSnapshotName=snapshot2", "--peerSnapshotTmpDir=/tmp/", "--peerFSAddress=tempfs", 286 "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", tableName.getNameAsString() }; 287 288 assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 289 } 290 291 @Test 292 public void testVerifyReplicationWithSnapshotSupport() throws Exception { 293 // Populate the tables, at the same time it guarantees that the tables are 294 // identical since it does the check 295 runSmallBatchTest(); 296 297 // Take source and target tables snapshot 298 Path rootDir = CommonFSUtils.getRootDir(CONF1); 299 FileSystem fs = rootDir.getFileSystem(CONF1); 300 String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 301 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 302 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 303 304 // Take target snapshot 305 Path peerRootDir = CommonFSUtils.getRootDir(CONF2); 306 FileSystem peerFs = peerRootDir.getFileSystem(CONF2); 307 String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 308 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, 309 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 310 311 String peerFSAddress = peerFs.getUri().toString(); 312 String temPath1 = UTIL1.getRandomDir().toString(); 313 String temPath2 = "/tmp" + EnvironmentEdgeManager.currentTime(); 314 315 String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 316 "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName, 317 "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, 318 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), "2", 319 tableName.getNameAsString() }; 320 TestVerifyReplicationZkClusterKey.runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 321 TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF1, temPath1, 1); 322 TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF2, temPath2, 1); 323 324 Scan scan = new Scan(); 325 ResultScanner rs = htable2.getScanner(scan); 326 Put put = null; 327 for (Result result : rs) { 328 put = new Put(result.getRow()); 329 Cell firstVal = result.rawCells()[0]; 330 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 331 Bytes.toBytes("diff data")); 332 htable2.put(put); 333 } 334 Delete delete = new Delete(put.getRow()); 335 htable2.delete(delete); 336 337 sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 338 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 339 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 340 341 peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 342 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, 343 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 344 345 args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 346 "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName, 347 "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, 348 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), "2", 349 tableName.getNameAsString() }; 350 TestVerifyReplicationZkClusterKey.runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 351 TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF1, temPath1, 2); 352 TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF2, temPath2, 2); 353 } 354 355 @AfterClass 356 public static void tearDownAfterClass() throws Exception { 357 htable3.close(); 358 TestReplicationBase.tearDownAfterClass(); 359 } 360}