001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.util.ArrayList;
027import java.util.List;
028import java.util.NavigableMap;
029import java.util.TreeMap;
030import org.apache.hadoop.hbase.CellUtil;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.KeyValue;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.Delete;
038import org.apache.hadoop.hbase.client.Get;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.client.Result;
042import org.apache.hadoop.hbase.client.ResultScanner;
043import org.apache.hadoop.hbase.client.Scan;
044import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
045import org.apache.hadoop.hbase.client.replication.TableCFs;
046import org.apache.hadoop.hbase.regionserver.HRegion;
047import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
048import org.apache.hadoop.hbase.testclassification.LargeTests;
049import org.apache.hadoop.hbase.testclassification.ReplicationTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.hadoop.hbase.wal.WAL;
053import org.apache.hadoop.hbase.wal.WALEdit;
054import org.apache.hadoop.hbase.wal.WALKeyImpl;
055import org.junit.Before;
056import org.junit.ClassRule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.junit.runner.RunWith;
060import org.junit.runners.Parameterized;
061import org.junit.runners.Parameterized.Parameter;
062import org.junit.runners.Parameterized.Parameters;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
067
068@RunWith(Parameterized.class)
069@Category({ ReplicationTests.class, LargeTests.class })
070public class TestReplicationSmallTests extends TestReplicationBase {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074    HBaseClassTestRule.forClass(TestReplicationSmallTests.class);
075
076  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSmallTests.class);
077  private static final String PEER_ID = "2";
078
079  @Parameter
080  public boolean serialPeer;
081
082  @Override
083  protected boolean isSerialPeer() {
084    return serialPeer;
085  }
086
087  @Parameters(name = "{index}: serialPeer={0}")
088  public static List<Boolean> parameters() {
089    return ImmutableList.of(true, false);
090  }
091
092  @Before
093  public void setUp() throws Exception {
094    cleanUp();
095  }
096
097  /**
098   * Verify that version and column delete marker types are replicated correctly.
099   */
100  @Test
101  public void testDeleteTypes() throws Exception {
102    LOG.info("testDeleteTypes");
103    final byte[] v1 = Bytes.toBytes("v1");
104    final byte[] v2 = Bytes.toBytes("v2");
105    final byte[] v3 = Bytes.toBytes("v3");
106    htable1 = UTIL1.getConnection().getTable(tableName);
107
108    long t = EnvironmentEdgeManager.currentTime();
109    // create three versions for "row"
110    Put put = new Put(row);
111    put.addColumn(famName, row, t, v1);
112    htable1.put(put);
113
114    put = new Put(row);
115    put.addColumn(famName, row, t + 1, v2);
116    htable1.put(put);
117
118    put = new Put(row);
119    put.addColumn(famName, row, t + 2, v3);
120    htable1.put(put);
121
122    Get get = new Get(row);
123    get.readAllVersions();
124    for (int i = 0; i < NB_RETRIES; i++) {
125      if (i == NB_RETRIES - 1) {
126        fail("Waited too much time for put replication");
127      }
128      Result res = htable2.get(get);
129      if (res.size() < 3) {
130        LOG.info("Rows not available");
131        Thread.sleep(SLEEP_TIME);
132      } else {
133        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
134        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
135        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1);
136        break;
137      }
138    }
139    // place a version delete marker (delete last version)
140    Delete d = new Delete(row);
141    d.addColumn(famName, row, t);
142    htable1.delete(d);
143
144    get = new Get(row);
145    get.readAllVersions();
146    for (int i = 0; i < NB_RETRIES; i++) {
147      if (i == NB_RETRIES - 1) {
148        fail("Waited too much time for put replication");
149      }
150      Result res = htable2.get(get);
151      if (res.size() > 2) {
152        LOG.info("Version not deleted");
153        Thread.sleep(SLEEP_TIME);
154      } else {
155        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
156        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
157        break;
158      }
159    }
160
161    // place a column delete marker
162    d = new Delete(row);
163    d.addColumns(famName, row, t + 2);
164    htable1.delete(d);
165
166    // now *both* of the remaining version should be deleted
167    // at the replica
168    get = new Get(row);
169    for (int i = 0; i < NB_RETRIES; i++) {
170      if (i == NB_RETRIES - 1) {
171        fail("Waited too much time for del replication");
172      }
173      Result res = htable2.get(get);
174      if (res.size() >= 1) {
175        LOG.info("Rows not deleted");
176        Thread.sleep(SLEEP_TIME);
177      } else {
178        break;
179      }
180    }
181  }
182
183  /**
184   * Add a row, check it's replicated, delete it, check's gone
185   */
186  @Test
187  public void testSimplePutDelete() throws Exception {
188    LOG.info("testSimplePutDelete");
189    runSimplePutDeleteTest();
190  }
191
192  /**
193   * Try a small batch upload using the write buffer, check it's replicated
194   */
195  @Test
196  public void testSmallBatch() throws Exception {
197    LOG.info("testSmallBatch");
198    runSmallBatchTest();
199  }
200
201  /**
202   * Test disable/enable replication, trying to insert, make sure nothing's replicated, enable it,
203   * the insert should be replicated
204   */
205  @Test
206  public void testDisableEnable() throws Exception {
207    // Test disabling replication
208    hbaseAdmin.disableReplicationPeer(PEER_ID);
209
210    byte[] rowkey = Bytes.toBytes("disable enable");
211    Put put = new Put(rowkey);
212    put.addColumn(famName, row, row);
213    htable1.put(put);
214
215    Get get = new Get(rowkey);
216    for (int i = 0; i < NB_RETRIES; i++) {
217      Result res = htable2.get(get);
218      if (res.size() >= 1) {
219        fail("Replication wasn't disabled");
220      } else {
221        LOG.info("Row not replicated, let's wait a bit more...");
222        Thread.sleep(SLEEP_TIME);
223      }
224    }
225
226    // Test enable replication
227    hbaseAdmin.enableReplicationPeer(PEER_ID);
228
229    for (int i = 0; i < NB_RETRIES; i++) {
230      Result res = htable2.get(get);
231      if (res.isEmpty()) {
232        LOG.info("Row not available");
233        Thread.sleep(SLEEP_TIME);
234      } else {
235        assertArrayEquals(row, res.value());
236        return;
237      }
238    }
239    fail("Waited too much time for put replication");
240  }
241
242  /**
243   * Integration test for TestReplicationAdmin, removes and re-add a peer cluster
244   */
245  @Test
246  public void testAddAndRemoveClusters() throws Exception {
247    LOG.info("testAddAndRemoveClusters");
248    hbaseAdmin.removeReplicationPeer(PEER_ID);
249    Thread.sleep(SLEEP_TIME);
250    byte[] rowKey = Bytes.toBytes("Won't be replicated");
251    Put put = new Put(rowKey);
252    put.addColumn(famName, row, row);
253    htable1.put(put);
254
255    Get get = new Get(rowKey);
256    for (int i = 0; i < NB_RETRIES; i++) {
257      if (i == NB_RETRIES - 1) {
258        break;
259      }
260      Result res = htable2.get(get);
261      if (res.size() >= 1) {
262        fail("Not supposed to be replicated");
263      } else {
264        LOG.info("Row not replicated, let's wait a bit more...");
265        Thread.sleep(SLEEP_TIME);
266      }
267    }
268    ReplicationPeerConfig rpc =
269      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).build();
270    hbaseAdmin.addReplicationPeer(PEER_ID, rpc);
271    Thread.sleep(SLEEP_TIME);
272    rowKey = Bytes.toBytes("do rep");
273    put = new Put(rowKey);
274    put.addColumn(famName, row, row);
275    LOG.info("Adding new row");
276    htable1.put(put);
277
278    get = new Get(rowKey);
279    for (int i = 0; i < NB_RETRIES; i++) {
280      if (i == NB_RETRIES - 1) {
281        fail("Waited too much time for put replication");
282      }
283      Result res = htable2.get(get);
284      if (res.isEmpty()) {
285        LOG.info("Row not available");
286        Thread.sleep(SLEEP_TIME * i);
287      } else {
288        assertArrayEquals(row, res.value());
289        break;
290      }
291    }
292  }
293
294  /**
295   * Do a more intense version testSmallBatch, one that will trigger wal rolling and other
296   * non-trivial code paths
297   */
298  @Test
299  public void testLoading() throws Exception {
300    LOG.info("Writing out rows to table1 in testLoading");
301    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BIG_BATCH);
302    for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
303      Put put = new Put(Bytes.toBytes(i));
304      put.addColumn(famName, row, row);
305      puts.add(put);
306    }
307    // The puts will be iterated through and flushed only when the buffer
308    // size is reached.
309    htable1.put(puts);
310
311    Scan scan = new Scan();
312
313    ResultScanner scanner = htable1.getScanner(scan);
314    Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
315    scanner.close();
316
317    assertEquals(NB_ROWS_IN_BIG_BATCH, res.length);
318
319    LOG.info("Looking in table2 for replicated rows in testLoading");
320    long start = EnvironmentEdgeManager.currentTime();
321    // Retry more than NB_RETRIES. As it was, retries were done in 5 seconds and we'd fail
322    // sometimes.
323    final long retries = NB_RETRIES * 10;
324    for (int i = 0; i < retries; i++) {
325      scan = new Scan();
326      scanner = htable2.getScanner(scan);
327      res = scanner.next(NB_ROWS_IN_BIG_BATCH);
328      scanner.close();
329      if (res.length != NB_ROWS_IN_BIG_BATCH) {
330        if (i == retries - 1) {
331          int lastRow = -1;
332          for (Result result : res) {
333            int currentRow = Bytes.toInt(result.getRow());
334            for (int row = lastRow + 1; row < currentRow; row++) {
335              LOG.error("Row missing: " + row);
336            }
337            lastRow = currentRow;
338          }
339          LOG.error("Last row: " + lastRow);
340          fail("Waited too much time for normal batch replication, " + res.length + " instead of "
341            + NB_ROWS_IN_BIG_BATCH + "; waited=" + (EnvironmentEdgeManager.currentTime() - start)
342            + "ms");
343        } else {
344          LOG.info("Only got " + res.length + " rows... retrying");
345          Thread.sleep(SLEEP_TIME);
346        }
347      } else {
348        break;
349      }
350    }
351  }
352
353  /**
354   * Test for HBASE-8663
355   * <p>
356   * Create two new Tables with colfamilies enabled for replication then run
357   * ReplicationAdmin.listReplicated(). Finally verify the table:colfamilies. Note:
358   * TestReplicationAdmin is a better place for this testing but it would need mocks.
359   */
360  @Test
361  public void testVerifyListReplicatedTable() throws Exception {
362    LOG.info("testVerifyListReplicatedTable");
363
364    final String tName = "VerifyListReplicated_";
365    final String colFam = "cf1";
366    final int numOfTables = 3;
367
368    Admin hadmin = UTIL1.getAdmin();
369
370    // Create Tables
371    for (int i = 0; i < numOfTables; i++) {
372      hadmin.createTable(TableDescriptorBuilder
373        .newBuilder(TableName.valueOf(tName + i)).setColumnFamily(ColumnFamilyDescriptorBuilder
374          .newBuilder(Bytes.toBytes(colFam)).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
375        .build());
376    }
377
378    // verify the result
379    List<TableCFs> replicationColFams = hbaseAdmin.listReplicatedTableCFs();
380    int[] match = new int[numOfTables]; // array of 3 with init value of zero
381
382    for (int i = 0; i < replicationColFams.size(); i++) {
383      TableCFs replicationEntry = replicationColFams.get(i);
384      String tn = replicationEntry.getTable().getNameAsString();
385      if (tn.startsWith(tName) && replicationEntry.getColumnFamilyMap().containsKey(colFam)) {
386        int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit
387        match[m]++; // should only increase once
388      }
389    }
390
391    // check the matching result
392    for (int i = 0; i < match.length; i++) {
393      assertTrue("listReplicated() does not match table " + i, (match[i] == 1));
394    }
395
396    // drop tables
397    for (int i = 0; i < numOfTables; i++) {
398      TableName tableName = TableName.valueOf(tName + i);
399      hadmin.disableTable(tableName);
400      hadmin.deleteTable(tableName);
401    }
402
403    hadmin.close();
404  }
405
406  /**
407   * Test for HBase-15259 WALEdits under replay will also be replicated
408   */
409  @Test
410  public void testReplicationInReplay() throws Exception {
411    final TableName tableName = htable1.getName();
412
413    HRegion region = UTIL1.getMiniHBaseCluster().getRegions(tableName).get(0);
414    RegionInfo hri = region.getRegionInfo();
415    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
416    for (byte[] fam : htable1.getDescriptor().getColumnFamilyNames()) {
417      scopes.put(fam, 1);
418    }
419    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
420    int index = UTIL1.getMiniHBaseCluster().getServerWith(hri.getRegionName());
421    WAL wal = UTIL1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo());
422    final byte[] rowName = Bytes.toBytes("testReplicationInReplay");
423    final byte[] qualifier = Bytes.toBytes("q");
424    final byte[] value = Bytes.toBytes("v");
425    WALEdit edit = new WALEdit(true);
426    long now = EnvironmentEdgeManager.currentTime();
427    edit.add(new KeyValue(rowName, famName, qualifier, now, value));
428    WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes);
429    wal.appendData(hri, walKey, edit);
430    wal.sync();
431
432    Get get = new Get(rowName);
433    for (int i = 0; i < NB_RETRIES; i++) {
434      if (i == NB_RETRIES - 1) {
435        break;
436      }
437      Result res = htable2.get(get);
438      if (res.size() >= 1) {
439        fail("Not supposed to be replicated for " + Bytes.toString(res.getRow()));
440      } else {
441        LOG.info("Row not replicated, let's wait a bit more...");
442        Thread.sleep(SLEEP_TIME);
443      }
444    }
445  }
446
447  /**
448   * Test for HBASE-27448 Add an admin method to get replication enabled state
449   */
450  @Test
451  public void testGetReplicationPeerState() throws Exception {
452
453    // Test disable replication peer
454    hbaseAdmin.disableReplicationPeer("2");
455    assertFalse(hbaseAdmin.isReplicationPeerEnabled("2"));
456
457    // Test enable replication peer
458    hbaseAdmin.enableReplicationPeer("2");
459    assertTrue(hbaseAdmin.isReplicationPeerEnabled("2"));
460  }
461}