001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.util.ArrayList; 027import java.util.List; 028import java.util.NavigableMap; 029import java.util.TreeMap; 030import org.apache.hadoop.hbase.CellUtil; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.KeyValue; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 037import org.apache.hadoop.hbase.client.Delete; 038import org.apache.hadoop.hbase.client.Get; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.RegionInfo; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.ResultScanner; 043import org.apache.hadoop.hbase.client.Scan; 044import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 045import org.apache.hadoop.hbase.client.replication.TableCFs; 046import org.apache.hadoop.hbase.regionserver.HRegion; 047import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 048import org.apache.hadoop.hbase.testclassification.LargeTests; 049import org.apache.hadoop.hbase.testclassification.ReplicationTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.hadoop.hbase.wal.WAL; 053import org.apache.hadoop.hbase.wal.WALEdit; 054import org.apache.hadoop.hbase.wal.WALKeyImpl; 055import org.junit.Before; 056import org.junit.ClassRule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.junit.runner.RunWith; 060import org.junit.runners.Parameterized; 061import org.junit.runners.Parameterized.Parameter; 062import org.junit.runners.Parameterized.Parameters; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 067 068@RunWith(Parameterized.class) 069@Category({ ReplicationTests.class, LargeTests.class }) 070public class TestReplicationSmallTests extends TestReplicationBase { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestReplicationSmallTests.class); 075 076 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSmallTests.class); 077 private static final String PEER_ID = "2"; 078 079 @Parameter 080 public boolean serialPeer; 081 082 @Override 083 protected boolean isSerialPeer() { 084 return serialPeer; 085 } 086 087 @Parameters(name = "{index}: serialPeer={0}") 088 public static List<Boolean> parameters() { 089 return ImmutableList.of(true, false); 090 } 091 092 @Before 093 public void setUp() throws Exception { 094 cleanUp(); 095 } 096 097 /** 098 * Verify that version and column delete marker types are replicated correctly. 099 */ 100 @Test 101 public void testDeleteTypes() throws Exception { 102 LOG.info("testDeleteTypes"); 103 final byte[] v1 = Bytes.toBytes("v1"); 104 final byte[] v2 = Bytes.toBytes("v2"); 105 final byte[] v3 = Bytes.toBytes("v3"); 106 htable1 = UTIL1.getConnection().getTable(tableName); 107 108 long t = EnvironmentEdgeManager.currentTime(); 109 // create three versions for "row" 110 Put put = new Put(row); 111 put.addColumn(famName, row, t, v1); 112 htable1.put(put); 113 114 put = new Put(row); 115 put.addColumn(famName, row, t + 1, v2); 116 htable1.put(put); 117 118 put = new Put(row); 119 put.addColumn(famName, row, t + 2, v3); 120 htable1.put(put); 121 122 Get get = new Get(row); 123 get.readAllVersions(); 124 for (int i = 0; i < NB_RETRIES; i++) { 125 if (i == NB_RETRIES - 1) { 126 fail("Waited too much time for put replication"); 127 } 128 Result res = htable2.get(get); 129 if (res.size() < 3) { 130 LOG.info("Rows not available"); 131 Thread.sleep(SLEEP_TIME); 132 } else { 133 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3); 134 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2); 135 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1); 136 break; 137 } 138 } 139 // place a version delete marker (delete last version) 140 Delete d = new Delete(row); 141 d.addColumn(famName, row, t); 142 htable1.delete(d); 143 144 get = new Get(row); 145 get.readAllVersions(); 146 for (int i = 0; i < NB_RETRIES; i++) { 147 if (i == NB_RETRIES - 1) { 148 fail("Waited too much time for put replication"); 149 } 150 Result res = htable2.get(get); 151 if (res.size() > 2) { 152 LOG.info("Version not deleted"); 153 Thread.sleep(SLEEP_TIME); 154 } else { 155 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3); 156 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2); 157 break; 158 } 159 } 160 161 // place a column delete marker 162 d = new Delete(row); 163 d.addColumns(famName, row, t + 2); 164 htable1.delete(d); 165 166 // now *both* of the remaining version should be deleted 167 // at the replica 168 get = new Get(row); 169 for (int i = 0; i < NB_RETRIES; i++) { 170 if (i == NB_RETRIES - 1) { 171 fail("Waited too much time for del replication"); 172 } 173 Result res = htable2.get(get); 174 if (res.size() >= 1) { 175 LOG.info("Rows not deleted"); 176 Thread.sleep(SLEEP_TIME); 177 } else { 178 break; 179 } 180 } 181 } 182 183 /** 184 * Add a row, check it's replicated, delete it, check's gone 185 */ 186 @Test 187 public void testSimplePutDelete() throws Exception { 188 LOG.info("testSimplePutDelete"); 189 runSimplePutDeleteTest(); 190 } 191 192 /** 193 * Try a small batch upload using the write buffer, check it's replicated 194 */ 195 @Test 196 public void testSmallBatch() throws Exception { 197 LOG.info("testSmallBatch"); 198 runSmallBatchTest(); 199 } 200 201 /** 202 * Test disable/enable replication, trying to insert, make sure nothing's replicated, enable it, 203 * the insert should be replicated 204 */ 205 @Test 206 public void testDisableEnable() throws Exception { 207 // Test disabling replication 208 hbaseAdmin.disableReplicationPeer(PEER_ID); 209 210 byte[] rowkey = Bytes.toBytes("disable enable"); 211 Put put = new Put(rowkey); 212 put.addColumn(famName, row, row); 213 htable1.put(put); 214 215 Get get = new Get(rowkey); 216 for (int i = 0; i < NB_RETRIES; i++) { 217 Result res = htable2.get(get); 218 if (res.size() >= 1) { 219 fail("Replication wasn't disabled"); 220 } else { 221 LOG.info("Row not replicated, let's wait a bit more..."); 222 Thread.sleep(SLEEP_TIME); 223 } 224 } 225 226 // Test enable replication 227 hbaseAdmin.enableReplicationPeer(PEER_ID); 228 229 for (int i = 0; i < NB_RETRIES; i++) { 230 Result res = htable2.get(get); 231 if (res.isEmpty()) { 232 LOG.info("Row not available"); 233 Thread.sleep(SLEEP_TIME); 234 } else { 235 assertArrayEquals(row, res.value()); 236 return; 237 } 238 } 239 fail("Waited too much time for put replication"); 240 } 241 242 /** 243 * Integration test for TestReplicationAdmin, removes and re-add a peer cluster 244 */ 245 @Test 246 public void testAddAndRemoveClusters() throws Exception { 247 LOG.info("testAddAndRemoveClusters"); 248 hbaseAdmin.removeReplicationPeer(PEER_ID); 249 Thread.sleep(SLEEP_TIME); 250 byte[] rowKey = Bytes.toBytes("Won't be replicated"); 251 Put put = new Put(rowKey); 252 put.addColumn(famName, row, row); 253 htable1.put(put); 254 255 Get get = new Get(rowKey); 256 for (int i = 0; i < NB_RETRIES; i++) { 257 if (i == NB_RETRIES - 1) { 258 break; 259 } 260 Result res = htable2.get(get); 261 if (res.size() >= 1) { 262 fail("Not supposed to be replicated"); 263 } else { 264 LOG.info("Row not replicated, let's wait a bit more..."); 265 Thread.sleep(SLEEP_TIME); 266 } 267 } 268 ReplicationPeerConfig rpc = 269 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).build(); 270 hbaseAdmin.addReplicationPeer(PEER_ID, rpc); 271 Thread.sleep(SLEEP_TIME); 272 rowKey = Bytes.toBytes("do rep"); 273 put = new Put(rowKey); 274 put.addColumn(famName, row, row); 275 LOG.info("Adding new row"); 276 htable1.put(put); 277 278 get = new Get(rowKey); 279 for (int i = 0; i < NB_RETRIES; i++) { 280 if (i == NB_RETRIES - 1) { 281 fail("Waited too much time for put replication"); 282 } 283 Result res = htable2.get(get); 284 if (res.isEmpty()) { 285 LOG.info("Row not available"); 286 Thread.sleep(SLEEP_TIME * i); 287 } else { 288 assertArrayEquals(row, res.value()); 289 break; 290 } 291 } 292 } 293 294 /** 295 * Do a more intense version testSmallBatch, one that will trigger wal rolling and other 296 * non-trivial code paths 297 */ 298 @Test 299 public void testLoading() throws Exception { 300 LOG.info("Writing out rows to table1 in testLoading"); 301 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BIG_BATCH); 302 for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) { 303 Put put = new Put(Bytes.toBytes(i)); 304 put.addColumn(famName, row, row); 305 puts.add(put); 306 } 307 // The puts will be iterated through and flushed only when the buffer 308 // size is reached. 309 htable1.put(puts); 310 311 Scan scan = new Scan(); 312 313 ResultScanner scanner = htable1.getScanner(scan); 314 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); 315 scanner.close(); 316 317 assertEquals(NB_ROWS_IN_BIG_BATCH, res.length); 318 319 LOG.info("Looking in table2 for replicated rows in testLoading"); 320 long start = EnvironmentEdgeManager.currentTime(); 321 // Retry more than NB_RETRIES. As it was, retries were done in 5 seconds and we'd fail 322 // sometimes. 323 final long retries = NB_RETRIES * 10; 324 for (int i = 0; i < retries; i++) { 325 scan = new Scan(); 326 scanner = htable2.getScanner(scan); 327 res = scanner.next(NB_ROWS_IN_BIG_BATCH); 328 scanner.close(); 329 if (res.length != NB_ROWS_IN_BIG_BATCH) { 330 if (i == retries - 1) { 331 int lastRow = -1; 332 for (Result result : res) { 333 int currentRow = Bytes.toInt(result.getRow()); 334 for (int row = lastRow + 1; row < currentRow; row++) { 335 LOG.error("Row missing: " + row); 336 } 337 lastRow = currentRow; 338 } 339 LOG.error("Last row: " + lastRow); 340 fail("Waited too much time for normal batch replication, " + res.length + " instead of " 341 + NB_ROWS_IN_BIG_BATCH + "; waited=" + (EnvironmentEdgeManager.currentTime() - start) 342 + "ms"); 343 } else { 344 LOG.info("Only got " + res.length + " rows... retrying"); 345 Thread.sleep(SLEEP_TIME); 346 } 347 } else { 348 break; 349 } 350 } 351 } 352 353 /** 354 * Test for HBASE-8663 355 * <p> 356 * Create two new Tables with colfamilies enabled for replication then run 357 * ReplicationAdmin.listReplicated(). Finally verify the table:colfamilies. Note: 358 * TestReplicationAdmin is a better place for this testing but it would need mocks. 359 */ 360 @Test 361 public void testVerifyListReplicatedTable() throws Exception { 362 LOG.info("testVerifyListReplicatedTable"); 363 364 final String tName = "VerifyListReplicated_"; 365 final String colFam = "cf1"; 366 final int numOfTables = 3; 367 368 Admin hadmin = UTIL1.getAdmin(); 369 370 // Create Tables 371 for (int i = 0; i < numOfTables; i++) { 372 hadmin.createTable(TableDescriptorBuilder 373 .newBuilder(TableName.valueOf(tName + i)).setColumnFamily(ColumnFamilyDescriptorBuilder 374 .newBuilder(Bytes.toBytes(colFam)).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 375 .build()); 376 } 377 378 // verify the result 379 List<TableCFs> replicationColFams = hbaseAdmin.listReplicatedTableCFs(); 380 int[] match = new int[numOfTables]; // array of 3 with init value of zero 381 382 for (int i = 0; i < replicationColFams.size(); i++) { 383 TableCFs replicationEntry = replicationColFams.get(i); 384 String tn = replicationEntry.getTable().getNameAsString(); 385 if (tn.startsWith(tName) && replicationEntry.getColumnFamilyMap().containsKey(colFam)) { 386 int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit 387 match[m]++; // should only increase once 388 } 389 } 390 391 // check the matching result 392 for (int i = 0; i < match.length; i++) { 393 assertTrue("listReplicated() does not match table " + i, (match[i] == 1)); 394 } 395 396 // drop tables 397 for (int i = 0; i < numOfTables; i++) { 398 TableName tableName = TableName.valueOf(tName + i); 399 hadmin.disableTable(tableName); 400 hadmin.deleteTable(tableName); 401 } 402 403 hadmin.close(); 404 } 405 406 /** 407 * Test for HBase-15259 WALEdits under replay will also be replicated 408 */ 409 @Test 410 public void testReplicationInReplay() throws Exception { 411 final TableName tableName = htable1.getName(); 412 413 HRegion region = UTIL1.getMiniHBaseCluster().getRegions(tableName).get(0); 414 RegionInfo hri = region.getRegionInfo(); 415 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 416 for (byte[] fam : htable1.getDescriptor().getColumnFamilyNames()) { 417 scopes.put(fam, 1); 418 } 419 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 420 int index = UTIL1.getMiniHBaseCluster().getServerWith(hri.getRegionName()); 421 WAL wal = UTIL1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo()); 422 final byte[] rowName = Bytes.toBytes("testReplicationInReplay"); 423 final byte[] qualifier = Bytes.toBytes("q"); 424 final byte[] value = Bytes.toBytes("v"); 425 WALEdit edit = new WALEdit(true); 426 long now = EnvironmentEdgeManager.currentTime(); 427 edit.add(new KeyValue(rowName, famName, qualifier, now, value)); 428 WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes); 429 wal.appendData(hri, walKey, edit); 430 wal.sync(); 431 432 Get get = new Get(rowName); 433 for (int i = 0; i < NB_RETRIES; i++) { 434 if (i == NB_RETRIES - 1) { 435 break; 436 } 437 Result res = htable2.get(get); 438 if (res.size() >= 1) { 439 fail("Not supposed to be replicated for " + Bytes.toString(res.getRow())); 440 } else { 441 LOG.info("Row not replicated, let's wait a bit more..."); 442 Thread.sleep(SLEEP_TIME); 443 } 444 } 445 } 446 447 /** 448 * Test for HBASE-27448 Add an admin method to get replication enabled state 449 */ 450 @Test 451 public void testGetReplicationPeerState() throws Exception { 452 453 // Test disable replication peer 454 hbaseAdmin.disableReplicationPeer("2"); 455 assertFalse(hbaseAdmin.isReplicationPeerEnabled("2")); 456 457 // Test enable replication peer 458 hbaseAdmin.enableReplicationPeer("2"); 459 assertTrue(hbaseAdmin.isReplicationPeerEnabled("2")); 460 } 461}