001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.mockito.ArgumentMatchers.any;
022import static org.mockito.Mockito.when;
023
024import java.io.IOException;
025import java.util.concurrent.ThreadLocalRandom;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.KeyValue;
028import org.apache.hadoop.hbase.client.Get;
029import org.apache.hadoop.hbase.client.Result;
030import org.apache.hadoop.hbase.client.Scan;
031import org.apache.hadoop.hbase.client.Table;
032import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
033import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplicationRecompareRunnable;
034import org.apache.hadoop.hbase.testclassification.ReplicationTests;
035import org.apache.hadoop.hbase.testclassification.SmallTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.mapreduce.Counter;
038import org.apache.hadoop.mapreduce.Mapper;
039import org.apache.hadoop.mapreduce.counters.GenericCounter;
040import org.junit.Before;
041import org.junit.ClassRule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044import org.junit.runner.RunWith;
045import org.mockito.Mock;
046import org.mockito.junit.MockitoJUnitRunner;
047
048@Category({ ReplicationTests.class, SmallTests.class })
049@RunWith(MockitoJUnitRunner.class)
050public class TestVerifyReplicationRecompareRunnable {
051
052  @ClassRule
053  public static final HBaseClassTestRule CLASS_RULE =
054    HBaseClassTestRule.forClass(TestVerifyReplicationRecompareRunnable.class);
055
056  @Mock
057  private Table sourceTable;
058
059  @Mock
060  private Table replicatedTable;
061
062  @Mock
063  private Mapper.Context context;
064
065  static Result genResult(int cols) {
066    KeyValue[] kvs = new KeyValue[cols];
067
068    for (int i = 0; i < cols; ++i) {
069      kvs[i] =
070        new KeyValue(genBytes(), genBytes(), genBytes(), System.currentTimeMillis(), genBytes());
071    }
072
073    return Result.create(kvs);
074  }
075
076  static byte[] genBytes() {
077    return Bytes.toBytes(ThreadLocalRandom.current().nextInt());
078  }
079
080  @Before
081  public void setUp() {
082    for (VerifyReplication.Verifier.Counters counter : VerifyReplication.Verifier.Counters
083      .values()) {
084      Counter emptyCounter = new GenericCounter(counter.name(), counter.name());
085      when(context.getCounter(counter)).thenReturn(emptyCounter);
086    }
087  }
088
089  @Test
090  public void itRecomparesGoodRow() throws IOException {
091    Result result = genResult(2);
092
093    when(sourceTable.get(any(Get.class))).thenReturn(result);
094    when(replicatedTable.get(any(Get.class))).thenReturn(result);
095
096    VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context,
097      genResult(5), null, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, "",
098      new Scan(), sourceTable, replicatedTable, 3, 1, 0, true);
099
100    runnable.run();
101
102    assertEquals(0, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
103    assertEquals(0,
104      context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
105    assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
106    assertEquals(1,
107      context.getCounter(VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED).getValue());
108    assertEquals(1,
109      context.getCounter(VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED).getValue());
110    assertEquals(2, context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue());
111  }
112
113  @Test
114  public void itRecomparesBadRow() throws IOException {
115    Result replicatedResult = genResult(1);
116    when(sourceTable.get(any(Get.class))).thenReturn(genResult(5));
117    when(replicatedTable.get(any(Get.class))).thenReturn(replicatedResult);
118
119    VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context,
120      genResult(5), replicatedResult, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS,
121      "", new Scan(), sourceTable, replicatedTable, 1, 1, 0, true);
122
123    runnable.run();
124
125    assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
126    assertEquals(1,
127      context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
128    assertEquals(0, context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
129    assertEquals(1,
130      context.getCounter(VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED).getValue());
131    assertEquals(0,
132      context.getCounter(VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED).getValue());
133    assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue());
134  }
135
136  @Test
137  public void itHandlesExceptionOnRecompare() throws IOException {
138    when(sourceTable.get(any(Get.class))).thenThrow(new IOException("Error!"));
139    when(replicatedTable.get(any(Get.class))).thenReturn(genResult(5));
140
141    VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context,
142      genResult(5), null, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, "",
143      new Scan(), sourceTable, replicatedTable, 1, 1, 0, true);
144
145    runnable.run();
146
147    assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
148    assertEquals(1,
149      context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
150    assertEquals(1,
151      context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue());
152    assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue());
153  }
154}