001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.util.ArrayList; 027import java.util.List; 028import java.util.NavigableMap; 029import java.util.TreeMap; 030import org.apache.hadoop.hbase.CellUtil; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.KeyValue; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 037import org.apache.hadoop.hbase.client.Delete; 038import org.apache.hadoop.hbase.client.Get; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.RegionInfo; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.ResultScanner; 043import org.apache.hadoop.hbase.client.Scan; 044import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 045import org.apache.hadoop.hbase.client.replication.TableCFs; 046import org.apache.hadoop.hbase.regionserver.HRegion; 047import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 048import org.apache.hadoop.hbase.testclassification.LargeTests; 049import org.apache.hadoop.hbase.testclassification.ReplicationTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.hadoop.hbase.wal.WAL; 053import org.apache.hadoop.hbase.wal.WALEdit; 054import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 055import org.apache.hadoop.hbase.wal.WALKeyImpl; 056import org.junit.Before; 057import org.junit.ClassRule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.junit.runner.RunWith; 061import org.junit.runners.Parameterized; 062import org.junit.runners.Parameterized.Parameter; 063import org.junit.runners.Parameterized.Parameters; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 068 069@RunWith(Parameterized.class) 070@Category({ ReplicationTests.class, LargeTests.class }) 071public class TestReplicationSmallTests extends TestReplicationBase { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestReplicationSmallTests.class); 076 077 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSmallTests.class); 078 private static final String PEER_ID = "2"; 079 080 @Parameter 081 public boolean serialPeer; 082 083 @Override 084 protected boolean isSerialPeer() { 085 return serialPeer; 086 } 087 088 @Parameters(name = "{index}: serialPeer={0}") 089 public static List<Boolean> parameters() { 090 return ImmutableList.of(true, false); 091 } 092 093 @Before 094 public void setUp() throws Exception { 095 cleanUp(); 096 } 097 098 /** 099 * Verify that version and column delete marker types are replicated correctly. 100 */ 101 @Test 102 public void testDeleteTypes() throws Exception { 103 LOG.info("testDeleteTypes"); 104 final byte[] v1 = Bytes.toBytes("v1"); 105 final byte[] v2 = Bytes.toBytes("v2"); 106 final byte[] v3 = Bytes.toBytes("v3"); 107 htable1 = UTIL1.getConnection().getTable(tableName); 108 109 long t = EnvironmentEdgeManager.currentTime(); 110 // create three versions for "row" 111 Put put = new Put(row); 112 put.addColumn(famName, row, t, v1); 113 htable1.put(put); 114 115 put = new Put(row); 116 put.addColumn(famName, row, t + 1, v2); 117 htable1.put(put); 118 119 put = new Put(row); 120 put.addColumn(famName, row, t + 2, v3); 121 htable1.put(put); 122 123 Get get = new Get(row); 124 get.readAllVersions(); 125 for (int i = 0; i < NB_RETRIES; i++) { 126 if (i == NB_RETRIES - 1) { 127 fail("Waited too much time for put replication"); 128 } 129 Result res = htable2.get(get); 130 if (res.size() < 3) { 131 LOG.info("Rows not available"); 132 Thread.sleep(SLEEP_TIME); 133 } else { 134 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3); 135 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2); 136 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1); 137 break; 138 } 139 } 140 // place a version delete marker (delete last version) 141 Delete d = new Delete(row); 142 d.addColumn(famName, row, t); 143 htable1.delete(d); 144 145 get = new Get(row); 146 get.readAllVersions(); 147 for (int i = 0; i < NB_RETRIES; i++) { 148 if (i == NB_RETRIES - 1) { 149 fail("Waited too much time for put replication"); 150 } 151 Result res = htable2.get(get); 152 if (res.size() > 2) { 153 LOG.info("Version not deleted"); 154 Thread.sleep(SLEEP_TIME); 155 } else { 156 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3); 157 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2); 158 break; 159 } 160 } 161 162 // place a column delete marker 163 d = new Delete(row); 164 d.addColumns(famName, row, t + 2); 165 htable1.delete(d); 166 167 // now *both* of the remaining version should be deleted 168 // at the replica 169 get = new Get(row); 170 for (int i = 0; i < NB_RETRIES; i++) { 171 if (i == NB_RETRIES - 1) { 172 fail("Waited too much time for del replication"); 173 } 174 Result res = htable2.get(get); 175 if (res.size() >= 1) { 176 LOG.info("Rows not deleted"); 177 Thread.sleep(SLEEP_TIME); 178 } else { 179 break; 180 } 181 } 182 } 183 184 /** 185 * Add a row, check it's replicated, delete it, check's gone 186 */ 187 @Test 188 public void testSimplePutDelete() throws Exception { 189 LOG.info("testSimplePutDelete"); 190 runSimplePutDeleteTest(); 191 } 192 193 /** 194 * Try a small batch upload using the write buffer, check it's replicated 195 */ 196 @Test 197 public void testSmallBatch() throws Exception { 198 LOG.info("testSmallBatch"); 199 runSmallBatchTest(); 200 } 201 202 /** 203 * Test disable/enable replication, trying to insert, make sure nothing's replicated, enable it, 204 * the insert should be replicated 205 */ 206 @Test 207 public void testDisableEnable() throws Exception { 208 // Test disabling replication 209 hbaseAdmin.disableReplicationPeer(PEER_ID); 210 211 byte[] rowkey = Bytes.toBytes("disable enable"); 212 Put put = new Put(rowkey); 213 put.addColumn(famName, row, row); 214 htable1.put(put); 215 216 Get get = new Get(rowkey); 217 for (int i = 0; i < NB_RETRIES; i++) { 218 Result res = htable2.get(get); 219 if (res.size() >= 1) { 220 fail("Replication wasn't disabled"); 221 } else { 222 LOG.info("Row not replicated, let's wait a bit more..."); 223 Thread.sleep(SLEEP_TIME); 224 } 225 } 226 227 // Test enable replication 228 hbaseAdmin.enableReplicationPeer(PEER_ID); 229 230 for (int i = 0; i < NB_RETRIES; i++) { 231 Result res = htable2.get(get); 232 if (res.isEmpty()) { 233 LOG.info("Row not available"); 234 Thread.sleep(SLEEP_TIME); 235 } else { 236 assertArrayEquals(row, res.value()); 237 return; 238 } 239 } 240 fail("Waited too much time for put replication"); 241 } 242 243 /** 244 * Removes and re-add a peer cluster 245 */ 246 @Test 247 public void testAddAndRemoveClusters() throws Exception { 248 LOG.info("testAddAndRemoveClusters"); 249 hbaseAdmin.removeReplicationPeer(PEER_ID); 250 Thread.sleep(SLEEP_TIME); 251 byte[] rowKey = Bytes.toBytes("Won't be replicated"); 252 Put put = new Put(rowKey); 253 put.addColumn(famName, row, row); 254 htable1.put(put); 255 256 Get get = new Get(rowKey); 257 for (int i = 0; i < NB_RETRIES; i++) { 258 if (i == NB_RETRIES - 1) { 259 break; 260 } 261 Result res = htable2.get(get); 262 if (res.size() >= 1) { 263 fail("Not supposed to be replicated"); 264 } else { 265 LOG.info("Row not replicated, let's wait a bit more..."); 266 Thread.sleep(SLEEP_TIME); 267 } 268 } 269 ReplicationPeerConfig rpc = 270 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()).build(); 271 hbaseAdmin.addReplicationPeer(PEER_ID, rpc); 272 Thread.sleep(SLEEP_TIME); 273 rowKey = Bytes.toBytes("do rep"); 274 put = new Put(rowKey); 275 put.addColumn(famName, row, row); 276 LOG.info("Adding new row"); 277 htable1.put(put); 278 279 get = new Get(rowKey); 280 for (int i = 0; i < NB_RETRIES; i++) { 281 if (i == NB_RETRIES - 1) { 282 fail("Waited too much time for put replication"); 283 } 284 Result res = htable2.get(get); 285 if (res.isEmpty()) { 286 LOG.info("Row not available"); 287 Thread.sleep(SLEEP_TIME * i); 288 } else { 289 assertArrayEquals(row, res.value()); 290 break; 291 } 292 } 293 } 294 295 /** 296 * Do a more intense version testSmallBatch, one that will trigger wal rolling and other 297 * non-trivial code paths 298 */ 299 @Test 300 public void testLoading() throws Exception { 301 LOG.info("Writing out rows to table1 in testLoading"); 302 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BIG_BATCH); 303 for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) { 304 Put put = new Put(Bytes.toBytes(i)); 305 put.addColumn(famName, row, row); 306 puts.add(put); 307 } 308 // The puts will be iterated through and flushed only when the buffer 309 // size is reached. 310 htable1.put(puts); 311 312 Scan scan = new Scan(); 313 314 ResultScanner scanner = htable1.getScanner(scan); 315 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); 316 scanner.close(); 317 318 assertEquals(NB_ROWS_IN_BIG_BATCH, res.length); 319 320 LOG.info("Looking in table2 for replicated rows in testLoading"); 321 long start = EnvironmentEdgeManager.currentTime(); 322 // Retry more than NB_RETRIES. As it was, retries were done in 5 seconds and we'd fail 323 // sometimes. 324 final long retries = NB_RETRIES * 10; 325 for (int i = 0; i < retries; i++) { 326 scan = new Scan(); 327 scanner = htable2.getScanner(scan); 328 res = scanner.next(NB_ROWS_IN_BIG_BATCH); 329 scanner.close(); 330 if (res.length != NB_ROWS_IN_BIG_BATCH) { 331 if (i == retries - 1) { 332 int lastRow = -1; 333 for (Result result : res) { 334 int currentRow = Bytes.toInt(result.getRow()); 335 for (int row = lastRow + 1; row < currentRow; row++) { 336 LOG.error("Row missing: " + row); 337 } 338 lastRow = currentRow; 339 } 340 LOG.error("Last row: " + lastRow); 341 fail("Waited too much time for normal batch replication, " + res.length + " instead of " 342 + NB_ROWS_IN_BIG_BATCH + "; waited=" + (EnvironmentEdgeManager.currentTime() - start) 343 + "ms"); 344 } else { 345 LOG.info("Only got " + res.length + " rows... retrying"); 346 Thread.sleep(SLEEP_TIME); 347 } 348 } else { 349 break; 350 } 351 } 352 } 353 354 /** 355 * Test for HBASE-8663 356 * <p> 357 * Create two new Tables with colfamilies enabled for replication then run 358 * {@link Admin#listReplicatedTableCFs()}. Finally verify the table:colfamilies. 359 */ 360 @Test 361 public void testVerifyListReplicatedTable() throws Exception { 362 LOG.info("testVerifyListReplicatedTable"); 363 364 final String tName = "VerifyListReplicated_"; 365 final String colFam = "cf1"; 366 final int numOfTables = 3; 367 368 Admin hadmin = UTIL1.getAdmin(); 369 370 // Create Tables 371 for (int i = 0; i < numOfTables; i++) { 372 hadmin.createTable(TableDescriptorBuilder 373 .newBuilder(TableName.valueOf(tName + i)).setColumnFamily(ColumnFamilyDescriptorBuilder 374 .newBuilder(Bytes.toBytes(colFam)).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 375 .build()); 376 } 377 378 // verify the result 379 List<TableCFs> replicationColFams = hbaseAdmin.listReplicatedTableCFs(); 380 int[] match = new int[numOfTables]; // array of 3 with init value of zero 381 382 for (int i = 0; i < replicationColFams.size(); i++) { 383 TableCFs replicationEntry = replicationColFams.get(i); 384 String tn = replicationEntry.getTable().getNameAsString(); 385 if (tn.startsWith(tName) && replicationEntry.getColumnFamilyMap().containsKey(colFam)) { 386 int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit 387 match[m]++; // should only increase once 388 } 389 } 390 391 // check the matching result 392 for (int i = 0; i < match.length; i++) { 393 assertTrue("listReplicated() does not match table " + i, (match[i] == 1)); 394 } 395 396 // drop tables 397 for (int i = 0; i < numOfTables; i++) { 398 TableName tableName = TableName.valueOf(tName + i); 399 hadmin.disableTable(tableName); 400 hadmin.deleteTable(tableName); 401 } 402 403 hadmin.close(); 404 } 405 406 /** 407 * Test for HBase-15259 WALEdits under replay will also be replicated 408 */ 409 @Test 410 public void testReplicationInReplay() throws Exception { 411 final TableName tableName = htable1.getName(); 412 413 HRegion region = UTIL1.getMiniHBaseCluster().getRegions(tableName).get(0); 414 RegionInfo hri = region.getRegionInfo(); 415 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 416 for (byte[] fam : htable1.getDescriptor().getColumnFamilyNames()) { 417 scopes.put(fam, 1); 418 } 419 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 420 int index = UTIL1.getMiniHBaseCluster().getServerWith(hri.getRegionName()); 421 WAL wal = UTIL1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo()); 422 final byte[] rowName = Bytes.toBytes("testReplicationInReplay"); 423 final byte[] qualifier = Bytes.toBytes("q"); 424 final byte[] value = Bytes.toBytes("v"); 425 WALEdit edit = new WALEdit(true); 426 long now = EnvironmentEdgeManager.currentTime(); 427 WALEditInternalHelper.addExtendedCell(edit, 428 new KeyValue(rowName, famName, qualifier, now, value)); 429 WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes); 430 wal.appendData(hri, walKey, edit); 431 wal.sync(); 432 433 Get get = new Get(rowName); 434 for (int i = 0; i < NB_RETRIES; i++) { 435 if (i == NB_RETRIES - 1) { 436 break; 437 } 438 Result res = htable2.get(get); 439 if (res.size() >= 1) { 440 fail("Not supposed to be replicated for " + Bytes.toString(res.getRow())); 441 } else { 442 LOG.info("Row not replicated, let's wait a bit more..."); 443 Thread.sleep(SLEEP_TIME); 444 } 445 } 446 } 447 448 /** 449 * Test for HBASE-27448 Add an admin method to get replication enabled state 450 */ 451 @Test 452 public void testGetReplicationPeerState() throws Exception { 453 454 // Test disable replication peer 455 hbaseAdmin.disableReplicationPeer("2"); 456 assertFalse(hbaseAdmin.isReplicationPeerEnabled("2")); 457 458 // Test enable replication peer 459 hbaseAdmin.enableReplicationPeer("2"); 460 assertTrue(hbaseAdmin.isReplicationPeerEnabled("2")); 461 } 462}