001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.util.ArrayList;
027import java.util.List;
028import java.util.NavigableMap;
029import java.util.TreeMap;
030import org.apache.hadoop.hbase.CellUtil;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.KeyValue;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.Delete;
038import org.apache.hadoop.hbase.client.Get;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.client.Result;
042import org.apache.hadoop.hbase.client.ResultScanner;
043import org.apache.hadoop.hbase.client.Scan;
044import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
045import org.apache.hadoop.hbase.client.replication.TableCFs;
046import org.apache.hadoop.hbase.regionserver.HRegion;
047import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
048import org.apache.hadoop.hbase.testclassification.LargeTests;
049import org.apache.hadoop.hbase.testclassification.ReplicationTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.hadoop.hbase.wal.WAL;
053import org.apache.hadoop.hbase.wal.WALEdit;
054import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
055import org.apache.hadoop.hbase.wal.WALKeyImpl;
056import org.junit.Before;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.junit.runner.RunWith;
061import org.junit.runners.Parameterized;
062import org.junit.runners.Parameterized.Parameter;
063import org.junit.runners.Parameterized.Parameters;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
068
069@RunWith(Parameterized.class)
070@Category({ ReplicationTests.class, LargeTests.class })
071public class TestReplicationSmallTests extends TestReplicationBase {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075    HBaseClassTestRule.forClass(TestReplicationSmallTests.class);
076
077  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSmallTests.class);
078  private static final String PEER_ID = "2";
079
080  @Parameter
081  public boolean serialPeer;
082
083  @Override
084  protected boolean isSerialPeer() {
085    return serialPeer;
086  }
087
088  @Parameters(name = "{index}: serialPeer={0}")
089  public static List<Boolean> parameters() {
090    return ImmutableList.of(true, false);
091  }
092
093  @Before
094  public void setUp() throws Exception {
095    cleanUp();
096  }
097
098  /**
099   * Verify that version and column delete marker types are replicated correctly.
100   */
101  @Test
102  public void testDeleteTypes() throws Exception {
103    LOG.info("testDeleteTypes");
104    final byte[] v1 = Bytes.toBytes("v1");
105    final byte[] v2 = Bytes.toBytes("v2");
106    final byte[] v3 = Bytes.toBytes("v3");
107    htable1 = UTIL1.getConnection().getTable(tableName);
108
109    long t = EnvironmentEdgeManager.currentTime();
110    // create three versions for "row"
111    Put put = new Put(row);
112    put.addColumn(famName, row, t, v1);
113    htable1.put(put);
114
115    put = new Put(row);
116    put.addColumn(famName, row, t + 1, v2);
117    htable1.put(put);
118
119    put = new Put(row);
120    put.addColumn(famName, row, t + 2, v3);
121    htable1.put(put);
122
123    Get get = new Get(row);
124    get.readAllVersions();
125    for (int i = 0; i < NB_RETRIES; i++) {
126      if (i == NB_RETRIES - 1) {
127        fail("Waited too much time for put replication");
128      }
129      Result res = htable2.get(get);
130      if (res.size() < 3) {
131        LOG.info("Rows not available");
132        Thread.sleep(SLEEP_TIME);
133      } else {
134        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
135        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
136        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1);
137        break;
138      }
139    }
140    // place a version delete marker (delete last version)
141    Delete d = new Delete(row);
142    d.addColumn(famName, row, t);
143    htable1.delete(d);
144
145    get = new Get(row);
146    get.readAllVersions();
147    for (int i = 0; i < NB_RETRIES; i++) {
148      if (i == NB_RETRIES - 1) {
149        fail("Waited too much time for put replication");
150      }
151      Result res = htable2.get(get);
152      if (res.size() > 2) {
153        LOG.info("Version not deleted");
154        Thread.sleep(SLEEP_TIME);
155      } else {
156        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
157        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
158        break;
159      }
160    }
161
162    // place a column delete marker
163    d = new Delete(row);
164    d.addColumns(famName, row, t + 2);
165    htable1.delete(d);
166
167    // now *both* of the remaining version should be deleted
168    // at the replica
169    get = new Get(row);
170    for (int i = 0; i < NB_RETRIES; i++) {
171      if (i == NB_RETRIES - 1) {
172        fail("Waited too much time for del replication");
173      }
174      Result res = htable2.get(get);
175      if (res.size() >= 1) {
176        LOG.info("Rows not deleted");
177        Thread.sleep(SLEEP_TIME);
178      } else {
179        break;
180      }
181    }
182  }
183
184  /**
185   * Add a row, check it's replicated, delete it, check's gone
186   */
187  @Test
188  public void testSimplePutDelete() throws Exception {
189    LOG.info("testSimplePutDelete");
190    runSimplePutDeleteTest();
191  }
192
193  /**
194   * Try a small batch upload using the write buffer, check it's replicated
195   */
196  @Test
197  public void testSmallBatch() throws Exception {
198    LOG.info("testSmallBatch");
199    runSmallBatchTest();
200  }
201
202  /**
203   * Test disable/enable replication, trying to insert, make sure nothing's replicated, enable it,
204   * the insert should be replicated
205   */
206  @Test
207  public void testDisableEnable() throws Exception {
208    // Test disabling replication
209    hbaseAdmin.disableReplicationPeer(PEER_ID);
210
211    byte[] rowkey = Bytes.toBytes("disable enable");
212    Put put = new Put(rowkey);
213    put.addColumn(famName, row, row);
214    htable1.put(put);
215
216    Get get = new Get(rowkey);
217    for (int i = 0; i < NB_RETRIES; i++) {
218      Result res = htable2.get(get);
219      if (res.size() >= 1) {
220        fail("Replication wasn't disabled");
221      } else {
222        LOG.info("Row not replicated, let's wait a bit more...");
223        Thread.sleep(SLEEP_TIME);
224      }
225    }
226
227    // Test enable replication
228    hbaseAdmin.enableReplicationPeer(PEER_ID);
229
230    for (int i = 0; i < NB_RETRIES; i++) {
231      Result res = htable2.get(get);
232      if (res.isEmpty()) {
233        LOG.info("Row not available");
234        Thread.sleep(SLEEP_TIME);
235      } else {
236        assertArrayEquals(row, res.value());
237        return;
238      }
239    }
240    fail("Waited too much time for put replication");
241  }
242
243  /**
244   * Removes and re-add a peer cluster
245   */
246  @Test
247  public void testAddAndRemoveClusters() throws Exception {
248    LOG.info("testAddAndRemoveClusters");
249    hbaseAdmin.removeReplicationPeer(PEER_ID);
250    Thread.sleep(SLEEP_TIME);
251    byte[] rowKey = Bytes.toBytes("Won't be replicated");
252    Put put = new Put(rowKey);
253    put.addColumn(famName, row, row);
254    htable1.put(put);
255
256    Get get = new Get(rowKey);
257    for (int i = 0; i < NB_RETRIES; i++) {
258      if (i == NB_RETRIES - 1) {
259        break;
260      }
261      Result res = htable2.get(get);
262      if (res.size() >= 1) {
263        fail("Not supposed to be replicated");
264      } else {
265        LOG.info("Row not replicated, let's wait a bit more...");
266        Thread.sleep(SLEEP_TIME);
267      }
268    }
269    ReplicationPeerConfig rpc =
270      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()).build();
271    hbaseAdmin.addReplicationPeer(PEER_ID, rpc);
272    Thread.sleep(SLEEP_TIME);
273    rowKey = Bytes.toBytes("do rep");
274    put = new Put(rowKey);
275    put.addColumn(famName, row, row);
276    LOG.info("Adding new row");
277    htable1.put(put);
278
279    get = new Get(rowKey);
280    for (int i = 0; i < NB_RETRIES; i++) {
281      if (i == NB_RETRIES - 1) {
282        fail("Waited too much time for put replication");
283      }
284      Result res = htable2.get(get);
285      if (res.isEmpty()) {
286        LOG.info("Row not available");
287        Thread.sleep(SLEEP_TIME * i);
288      } else {
289        assertArrayEquals(row, res.value());
290        break;
291      }
292    }
293  }
294
295  /**
296   * Do a more intense version testSmallBatch, one that will trigger wal rolling and other
297   * non-trivial code paths
298   */
299  @Test
300  public void testLoading() throws Exception {
301    LOG.info("Writing out rows to table1 in testLoading");
302    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BIG_BATCH);
303    for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
304      Put put = new Put(Bytes.toBytes(i));
305      put.addColumn(famName, row, row);
306      puts.add(put);
307    }
308    // The puts will be iterated through and flushed only when the buffer
309    // size is reached.
310    htable1.put(puts);
311
312    Scan scan = new Scan();
313
314    ResultScanner scanner = htable1.getScanner(scan);
315    Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
316    scanner.close();
317
318    assertEquals(NB_ROWS_IN_BIG_BATCH, res.length);
319
320    LOG.info("Looking in table2 for replicated rows in testLoading");
321    long start = EnvironmentEdgeManager.currentTime();
322    // Retry more than NB_RETRIES. As it was, retries were done in 5 seconds and we'd fail
323    // sometimes.
324    final long retries = NB_RETRIES * 10;
325    for (int i = 0; i < retries; i++) {
326      scan = new Scan();
327      scanner = htable2.getScanner(scan);
328      res = scanner.next(NB_ROWS_IN_BIG_BATCH);
329      scanner.close();
330      if (res.length != NB_ROWS_IN_BIG_BATCH) {
331        if (i == retries - 1) {
332          int lastRow = -1;
333          for (Result result : res) {
334            int currentRow = Bytes.toInt(result.getRow());
335            for (int row = lastRow + 1; row < currentRow; row++) {
336              LOG.error("Row missing: " + row);
337            }
338            lastRow = currentRow;
339          }
340          LOG.error("Last row: " + lastRow);
341          fail("Waited too much time for normal batch replication, " + res.length + " instead of "
342            + NB_ROWS_IN_BIG_BATCH + "; waited=" + (EnvironmentEdgeManager.currentTime() - start)
343            + "ms");
344        } else {
345          LOG.info("Only got " + res.length + " rows... retrying");
346          Thread.sleep(SLEEP_TIME);
347        }
348      } else {
349        break;
350      }
351    }
352  }
353
354  /**
355   * Test for HBASE-8663
356   * <p>
357   * Create two new Tables with colfamilies enabled for replication then run
358   * {@link Admin#listReplicatedTableCFs()}. Finally verify the table:colfamilies.
359   */
360  @Test
361  public void testVerifyListReplicatedTable() throws Exception {
362    LOG.info("testVerifyListReplicatedTable");
363
364    final String tName = "VerifyListReplicated_";
365    final String colFam = "cf1";
366    final int numOfTables = 3;
367
368    Admin hadmin = UTIL1.getAdmin();
369
370    // Create Tables
371    for (int i = 0; i < numOfTables; i++) {
372      hadmin.createTable(TableDescriptorBuilder
373        .newBuilder(TableName.valueOf(tName + i)).setColumnFamily(ColumnFamilyDescriptorBuilder
374          .newBuilder(Bytes.toBytes(colFam)).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
375        .build());
376    }
377
378    // verify the result
379    List<TableCFs> replicationColFams = hbaseAdmin.listReplicatedTableCFs();
380    int[] match = new int[numOfTables]; // array of 3 with init value of zero
381
382    for (int i = 0; i < replicationColFams.size(); i++) {
383      TableCFs replicationEntry = replicationColFams.get(i);
384      String tn = replicationEntry.getTable().getNameAsString();
385      if (tn.startsWith(tName) && replicationEntry.getColumnFamilyMap().containsKey(colFam)) {
386        int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit
387        match[m]++; // should only increase once
388      }
389    }
390
391    // check the matching result
392    for (int i = 0; i < match.length; i++) {
393      assertTrue("listReplicated() does not match table " + i, (match[i] == 1));
394    }
395
396    // drop tables
397    for (int i = 0; i < numOfTables; i++) {
398      TableName tableName = TableName.valueOf(tName + i);
399      hadmin.disableTable(tableName);
400      hadmin.deleteTable(tableName);
401    }
402
403    hadmin.close();
404  }
405
406  /**
407   * Test for HBase-15259 WALEdits under replay will also be replicated
408   */
409  @Test
410  public void testReplicationInReplay() throws Exception {
411    final TableName tableName = htable1.getName();
412
413    HRegion region = UTIL1.getMiniHBaseCluster().getRegions(tableName).get(0);
414    RegionInfo hri = region.getRegionInfo();
415    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
416    for (byte[] fam : htable1.getDescriptor().getColumnFamilyNames()) {
417      scopes.put(fam, 1);
418    }
419    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
420    int index = UTIL1.getMiniHBaseCluster().getServerWith(hri.getRegionName());
421    WAL wal = UTIL1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo());
422    final byte[] rowName = Bytes.toBytes("testReplicationInReplay");
423    final byte[] qualifier = Bytes.toBytes("q");
424    final byte[] value = Bytes.toBytes("v");
425    WALEdit edit = new WALEdit(true);
426    long now = EnvironmentEdgeManager.currentTime();
427    WALEditInternalHelper.addExtendedCell(edit,
428      new KeyValue(rowName, famName, qualifier, now, value));
429    WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes);
430    wal.appendData(hri, walKey, edit);
431    wal.sync();
432
433    Get get = new Get(rowName);
434    for (int i = 0; i < NB_RETRIES; i++) {
435      if (i == NB_RETRIES - 1) {
436        break;
437      }
438      Result res = htable2.get(get);
439      if (res.size() >= 1) {
440        fail("Not supposed to be replicated for " + Bytes.toString(res.getRow()));
441      } else {
442        LOG.info("Row not replicated, let's wait a bit more...");
443        Thread.sleep(SLEEP_TIME);
444      }
445    }
446  }
447
448  /**
449   * Test for HBASE-27448 Add an admin method to get replication enabled state
450   */
451  @Test
452  public void testGetReplicationPeerState() throws Exception {
453
454    // Test disable replication peer
455    hbaseAdmin.disableReplicationPeer("2");
456    assertFalse(hbaseAdmin.isReplicationPeerEnabled("2"));
457
458    // Test enable replication peer
459    hbaseAdmin.enableReplicationPeer("2");
460    assertTrue(hbaseAdmin.isReplicationPeerEnabled("2"));
461  }
462}