001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.mockito.ArgumentMatchers.any; 022import static org.mockito.Mockito.when; 023 024import java.io.IOException; 025import java.util.concurrent.ThreadLocalRandom; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.KeyValue; 028import org.apache.hadoop.hbase.client.Get; 029import org.apache.hadoop.hbase.client.Result; 030import org.apache.hadoop.hbase.client.Scan; 031import org.apache.hadoop.hbase.client.Table; 032import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; 033import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplicationRecompareRunnable; 034import org.apache.hadoop.hbase.testclassification.ReplicationTests; 035import org.apache.hadoop.hbase.testclassification.SmallTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.mapreduce.Counter; 038import org.apache.hadoop.mapreduce.Mapper; 039import org.apache.hadoop.mapreduce.counters.GenericCounter; 040import org.junit.Before; 041import org.junit.ClassRule; 042import org.junit.Test; 043import org.junit.experimental.categories.Category; 044import org.junit.runner.RunWith; 045import org.mockito.Mock; 046import org.mockito.junit.MockitoJUnitRunner; 047 048@Category({ ReplicationTests.class, SmallTests.class }) 049@RunWith(MockitoJUnitRunner.class) 050public class TestVerifyReplicationRecompareRunnable { 051 052 @ClassRule 053 public static final HBaseClassTestRule CLASS_RULE = 054 HBaseClassTestRule.forClass(TestVerifyReplicationRecompareRunnable.class); 055 056 @Mock 057 private Table sourceTable; 058 059 @Mock 060 private Table replicatedTable; 061 062 @Mock 063 private Mapper.Context context; 064 065 static Result genResult(int cols) { 066 KeyValue[] kvs = new KeyValue[cols]; 067 068 for (int i = 0; i < cols; ++i) { 069 kvs[i] = 070 new KeyValue(genBytes(), genBytes(), genBytes(), System.currentTimeMillis(), genBytes()); 071 } 072 073 return Result.create(kvs); 074 } 075 076 static byte[] genBytes() { 077 return Bytes.toBytes(ThreadLocalRandom.current().nextInt()); 078 } 079 080 @Before 081 public void setUp() { 082 for (VerifyReplication.Verifier.Counters counter : VerifyReplication.Verifier.Counters 083 .values()) { 084 Counter emptyCounter = new GenericCounter(counter.name(), counter.name()); 085 when(context.getCounter(counter)).thenReturn(emptyCounter); 086 } 087 } 088 089 @Test 090 public void itRecomparesGoodRow() throws IOException { 091 Result result = genResult(2); 092 093 when(sourceTable.get(any(Get.class))).thenReturn(result); 094 when(replicatedTable.get(any(Get.class))).thenReturn(result); 095 096 VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, 097 genResult(5), null, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, "", 098 new Scan(), sourceTable, replicatedTable, 3, 1, 0, true); 099 100 runnable.run(); 101 102 assertEquals(0, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); 103 assertEquals(0, 104 context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); 105 assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); 106 assertEquals(1, 107 context.getCounter(VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED).getValue()); 108 assertEquals(1, 109 context.getCounter(VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED).getValue()); 110 assertEquals(2, context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue()); 111 } 112 113 @Test 114 public void itRecomparesBadRow() throws IOException { 115 Result replicatedResult = genResult(1); 116 when(sourceTable.get(any(Get.class))).thenReturn(genResult(5)); 117 when(replicatedTable.get(any(Get.class))).thenReturn(replicatedResult); 118 119 VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, 120 genResult(5), replicatedResult, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, 121 "", new Scan(), sourceTable, replicatedTable, 1, 1, 0, true); 122 123 runnable.run(); 124 125 assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); 126 assertEquals(1, 127 context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); 128 assertEquals(0, context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); 129 assertEquals(1, 130 context.getCounter(VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED).getValue()); 131 assertEquals(0, 132 context.getCounter(VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED).getValue()); 133 assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue()); 134 } 135 136 @Test 137 public void itHandlesExceptionOnRecompare() throws IOException { 138 when(sourceTable.get(any(Get.class))).thenThrow(new IOException("Error!")); 139 when(replicatedTable.get(any(Get.class))).thenReturn(genResult(5)); 140 141 VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, 142 genResult(5), null, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, "", 143 new Scan(), sourceTable, replicatedTable, 1, 1, 0, true); 144 145 runnable.run(); 146 147 assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); 148 assertEquals(1, 149 context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); 150 assertEquals(1, 151 context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue()); 152 assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue()); 153 } 154}