001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Delete;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.ResultScanner;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
044import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
045import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
046import org.apache.hadoop.hbase.testclassification.LargeTests;
047import org.apache.hadoop.hbase.testclassification.ReplicationTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.CommonFSUtils;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.junit.AfterClass;
052import org.junit.Before;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.TestName;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
063
064/**
065 * We moved some of {@link TestVerifyReplicationZkClusterKey}'s tests here because it could take too
066 * long to complete. In here we have miscellaneous.
067 */
068@Category({ ReplicationTests.class, LargeTests.class })
069public class TestVerifyReplicationAdjunct extends TestReplicationBase {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073    HBaseClassTestRule.forClass(TestVerifyReplicationAdjunct.class);
074
075  private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplicationAdjunct.class);
076
077  private static final String PEER_ID = "2";
078  private static final TableName peerTableName = TableName.valueOf("peerTest");
079  private static Table htable3;
080
081  @Rule
082  public TestName name = new TestName();
083
084  @Override
085  protected String getClusterKey(HBaseTestingUtil util) throws Exception {
086    // TODO: VerifyReplication does not support connection uri yet, so here we need to use cluster
087    // key, as in this test we will pass the cluster key config in peer config directly to
088    // VerifyReplication job.
089    return util.getClusterKey();
090  }
091
092  @Before
093  public void setUp() throws Exception {
094    cleanUp();
095    UTIL2.deleteTableData(peerTableName);
096  }
097
098  @BeforeClass
099  public static void setUpBeforeClass() throws Exception {
100    TestReplicationBase.setUpBeforeClass();
101    TableDescriptor peerTable =
102      TableDescriptorBuilder.newBuilder(peerTableName)
103        .setColumnFamily(
104          ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100).build())
105        .build();
106    Connection connection2 = ConnectionFactory.createConnection(CONF2);
107    try (Admin admin2 = connection2.getAdmin()) {
108      admin2.createTable(peerTable, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
109    }
110    htable3 = connection2.getTable(peerTableName);
111  }
112
113  // VerifyReplication should honor versions option
114  @Test
115  public void testHBase14905() throws Exception {
116    // normal Batch tests
117    byte[] qualifierName = Bytes.toBytes("f1");
118    Put put = new Put(Bytes.toBytes("r1"));
119    long ts = EnvironmentEdgeManager.currentTime();
120    put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1002"));
121    htable1.put(put);
122    put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v1001"));
123    htable1.put(put);
124    put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v1112"));
125    htable1.put(put);
126
127    Scan scan = new Scan();
128    scan.readVersions(100);
129    ResultScanner scanner1 = htable1.getScanner(scan);
130    Result[] res1 = scanner1.next(1);
131    scanner1.close();
132
133    assertEquals(1, res1.length);
134    assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
135
136    for (int i = 0; i < NB_RETRIES; i++) {
137      scan = new Scan();
138      scan.readVersions(100);
139      scanner1 = htable2.getScanner(scan);
140      res1 = scanner1.next(1);
141      scanner1.close();
142      if (res1.length != 1) {
143        LOG.info("Only got " + res1.length + " rows");
144        Thread.sleep(SLEEP_TIME);
145      } else {
146        int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
147        if (cellNumber != 3) {
148          LOG.info("Only got " + cellNumber + " cells");
149          Thread.sleep(SLEEP_TIME);
150        } else {
151          break;
152        }
153      }
154      if (i == NB_RETRIES - 1) {
155        fail("Waited too much time for normal batch replication");
156      }
157    }
158
159    put.addColumn(famName, qualifierName, ts + 4, Bytes.toBytes("v1111"));
160    htable2.put(put);
161    put.addColumn(famName, qualifierName, ts + 5, Bytes.toBytes("v1112"));
162    htable2.put(put);
163
164    scan = new Scan();
165    scan.readVersions(100);
166    scanner1 = htable2.getScanner(scan);
167    res1 = scanner1.next(NB_ROWS_IN_BATCH);
168    scanner1.close();
169
170    assertEquals(1, res1.length);
171    assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size());
172
173    String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() };
174    TestVerifyReplicationZkClusterKey.runVerifyReplication(args, 0, 1);
175  }
176
177  // VerifyReplication should honor versions option
178  @Test
179  public void testVersionMismatchHBase14905() throws Exception {
180    // normal Batch tests
181    byte[] qualifierName = Bytes.toBytes("f1");
182    Put put = new Put(Bytes.toBytes("r1"));
183    long ts = EnvironmentEdgeManager.currentTime();
184    put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1"));
185    htable1.put(put);
186    put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v2"));
187    htable1.put(put);
188    put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v3"));
189    htable1.put(put);
190
191    Scan scan = new Scan();
192    scan.readVersions(100);
193    ResultScanner scanner1 = htable1.getScanner(scan);
194    Result[] res1 = scanner1.next(1);
195    scanner1.close();
196
197    assertEquals(1, res1.length);
198    assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
199
200    for (int i = 0; i < NB_RETRIES; i++) {
201      scan = new Scan();
202      scan.readVersions(100);
203      scanner1 = htable2.getScanner(scan);
204      res1 = scanner1.next(1);
205      scanner1.close();
206      if (res1.length != 1) {
207        LOG.info("Only got " + res1.length + " rows");
208        Thread.sleep(SLEEP_TIME);
209      } else {
210        int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
211        if (cellNumber != 3) {
212          LOG.info("Only got " + cellNumber + " cells");
213          Thread.sleep(SLEEP_TIME);
214        } else {
215          break;
216        }
217      }
218      if (i == NB_RETRIES - 1) {
219        fail("Waited too much time for normal batch replication");
220      }
221    }
222
223    try {
224      // Disabling replication and modifying the particular version of the cell to validate the
225      // feature.
226      hbaseAdmin.disableReplicationPeer(PEER_ID);
227      Put put2 = new Put(Bytes.toBytes("r1"));
228      put2.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v99"));
229      htable2.put(put2);
230
231      scan = new Scan();
232      scan.readVersions(100);
233      scanner1 = htable2.getScanner(scan);
234      res1 = scanner1.next(NB_ROWS_IN_BATCH);
235      scanner1.close();
236      assertEquals(1, res1.length);
237      assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
238
239      String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() };
240      TestVerifyReplicationZkClusterKey.runVerifyReplication(args, 0, 1);
241    } finally {
242      hbaseAdmin.enableReplicationPeer(PEER_ID);
243    }
244  }
245
246  @Test
247  public void testVerifyReplicationPrefixFiltering() throws Exception {
248    final byte[] prefixRow = Bytes.toBytes("prefixrow");
249    final byte[] prefixRow2 = Bytes.toBytes("secondrow");
250    loadData("prefixrow", prefixRow);
251    loadData("secondrow", prefixRow2);
252    loadData("aaa", row);
253    loadData("zzz", row);
254    waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4);
255    String[] args =
256      new String[] { "--row-prefixes=prefixrow,secondrow", PEER_ID, tableName.getNameAsString() };
257    TestVerifyReplicationZkClusterKey.runVerifyReplication(args, NB_ROWS_IN_BATCH * 2, 0);
258  }
259
260  @Test
261  public void testVerifyReplicationSnapshotArguments() {
262    String[] args =
263      new String[] { "--sourceSnapshotName=snapshot1", "2", tableName.getNameAsString() };
264    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
265
266    args = new String[] { "--sourceSnapshotTmpDir=tmp", "2", tableName.getNameAsString() };
267    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
268
269    args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=tmp", "2",
270      tableName.getNameAsString() };
271    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
272
273    args = new String[] { "--peerSnapshotName=snapshot1", "2", tableName.getNameAsString() };
274    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
275
276    args = new String[] { "--peerSnapshotTmpDir=/tmp/", "2", tableName.getNameAsString() };
277    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
278
279    args = new String[] { "--peerSnapshotName=snapshot1", "--peerSnapshotTmpDir=/tmp/",
280      "--peerFSAddress=tempfs", "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2",
281      tableName.getNameAsString() };
282    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
283
284    args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=/tmp/",
285      "--peerSnapshotName=snapshot2", "--peerSnapshotTmpDir=/tmp/", "--peerFSAddress=tempfs",
286      "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", tableName.getNameAsString() };
287
288    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
289  }
290
291  @Test
292  public void testVerifyReplicationWithSnapshotSupport() throws Exception {
293    // Populate the tables, at the same time it guarantees that the tables are
294    // identical since it does the check
295    runSmallBatchTest();
296
297    // Take source and target tables snapshot
298    Path rootDir = CommonFSUtils.getRootDir(CONF1);
299    FileSystem fs = rootDir.getFileSystem(CONF1);
300    String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
301    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
302      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
303
304    // Take target snapshot
305    Path peerRootDir = CommonFSUtils.getRootDir(CONF2);
306    FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
307    String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
308    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
309      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
310
311    String peerFSAddress = peerFs.getUri().toString();
312    String temPath1 = UTIL1.getRandomDir().toString();
313    String temPath2 = "/tmp" + EnvironmentEdgeManager.currentTime();
314
315    String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
316      "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
317      "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
318      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), "2",
319      tableName.getNameAsString() };
320    TestVerifyReplicationZkClusterKey.runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
321    TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF1, temPath1, 1);
322    TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF2, temPath2, 1);
323
324    Scan scan = new Scan();
325    ResultScanner rs = htable2.getScanner(scan);
326    Put put = null;
327    for (Result result : rs) {
328      put = new Put(result.getRow());
329      Cell firstVal = result.rawCells()[0];
330      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
331        Bytes.toBytes("diff data"));
332      htable2.put(put);
333    }
334    Delete delete = new Delete(put.getRow());
335    htable2.delete(delete);
336
337    sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
338    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
339      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
340
341    peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
342    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
343      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
344
345    args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
346      "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
347      "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
348      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), "2",
349      tableName.getNameAsString() };
350    TestVerifyReplicationZkClusterKey.runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
351    TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF1, temPath1, 2);
352    TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF2, temPath2, 2);
353  }
354
355  @AfterClass
356  public static void tearDownAfterClass() throws Exception {
357    htable3.close();
358    TestReplicationBase.tearDownAfterClass();
359  }
360}