001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Delete;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.ResultScanner;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
044import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
045import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
046import org.apache.hadoop.hbase.testclassification.LargeTests;
047import org.apache.hadoop.hbase.testclassification.ReplicationTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.CommonFSUtils;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.junit.AfterClass;
052import org.junit.Before;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.TestName;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
063
064/**
065 * We moved some of {@link TestVerifyReplication}'s tests here because it could take too long to
066 * complete. In here we have miscellaneous.
067 */
068@Category({ ReplicationTests.class, LargeTests.class })
069public class TestVerifyReplicationAdjunct extends TestReplicationBase {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073    HBaseClassTestRule.forClass(TestVerifyReplicationAdjunct.class);
074
075  private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplicationAdjunct.class);
076
077  private static final String PEER_ID = "2";
078  private static final TableName peerTableName = TableName.valueOf("peerTest");
079  private static Table htable3;
080
081  @Rule
082  public TestName name = new TestName();
083
084  @Before
085  public void setUp() throws Exception {
086    cleanUp();
087    UTIL2.deleteTableData(peerTableName);
088  }
089
090  @BeforeClass
091  public static void setUpBeforeClass() throws Exception {
092    TestReplicationBase.setUpBeforeClass();
093    TableDescriptor peerTable =
094      TableDescriptorBuilder.newBuilder(peerTableName)
095        .setColumnFamily(
096          ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100).build())
097        .build();
098    Connection connection2 = ConnectionFactory.createConnection(CONF2);
099    try (Admin admin2 = connection2.getAdmin()) {
100      admin2.createTable(peerTable, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
101    }
102    htable3 = connection2.getTable(peerTableName);
103  }
104
105  // VerifyReplication should honor versions option
106  @Test
107  public void testHBase14905() throws Exception {
108    // normal Batch tests
109    byte[] qualifierName = Bytes.toBytes("f1");
110    Put put = new Put(Bytes.toBytes("r1"));
111    long ts = EnvironmentEdgeManager.currentTime();
112    put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1002"));
113    htable1.put(put);
114    put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v1001"));
115    htable1.put(put);
116    put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v1112"));
117    htable1.put(put);
118
119    Scan scan = new Scan();
120    scan.readVersions(100);
121    ResultScanner scanner1 = htable1.getScanner(scan);
122    Result[] res1 = scanner1.next(1);
123    scanner1.close();
124
125    assertEquals(1, res1.length);
126    assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
127
128    for (int i = 0; i < NB_RETRIES; i++) {
129      scan = new Scan();
130      scan.readVersions(100);
131      scanner1 = htable2.getScanner(scan);
132      res1 = scanner1.next(1);
133      scanner1.close();
134      if (res1.length != 1) {
135        LOG.info("Only got " + res1.length + " rows");
136        Thread.sleep(SLEEP_TIME);
137      } else {
138        int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
139        if (cellNumber != 3) {
140          LOG.info("Only got " + cellNumber + " cells");
141          Thread.sleep(SLEEP_TIME);
142        } else {
143          break;
144        }
145      }
146      if (i == NB_RETRIES - 1) {
147        fail("Waited too much time for normal batch replication");
148      }
149    }
150
151    put.addColumn(famName, qualifierName, ts + 4, Bytes.toBytes("v1111"));
152    htable2.put(put);
153    put.addColumn(famName, qualifierName, ts + 5, Bytes.toBytes("v1112"));
154    htable2.put(put);
155
156    scan = new Scan();
157    scan.readVersions(100);
158    scanner1 = htable2.getScanner(scan);
159    res1 = scanner1.next(NB_ROWS_IN_BATCH);
160    scanner1.close();
161
162    assertEquals(1, res1.length);
163    assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size());
164
165    String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() };
166    TestVerifyReplication.runVerifyReplication(args, 0, 1);
167  }
168
169  // VerifyReplication should honor versions option
170  @Test
171  public void testVersionMismatchHBase14905() throws Exception {
172    // normal Batch tests
173    byte[] qualifierName = Bytes.toBytes("f1");
174    Put put = new Put(Bytes.toBytes("r1"));
175    long ts = EnvironmentEdgeManager.currentTime();
176    put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1"));
177    htable1.put(put);
178    put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v2"));
179    htable1.put(put);
180    put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v3"));
181    htable1.put(put);
182
183    Scan scan = new Scan();
184    scan.readVersions(100);
185    ResultScanner scanner1 = htable1.getScanner(scan);
186    Result[] res1 = scanner1.next(1);
187    scanner1.close();
188
189    assertEquals(1, res1.length);
190    assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
191
192    for (int i = 0; i < NB_RETRIES; i++) {
193      scan = new Scan();
194      scan.readVersions(100);
195      scanner1 = htable2.getScanner(scan);
196      res1 = scanner1.next(1);
197      scanner1.close();
198      if (res1.length != 1) {
199        LOG.info("Only got " + res1.length + " rows");
200        Thread.sleep(SLEEP_TIME);
201      } else {
202        int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
203        if (cellNumber != 3) {
204          LOG.info("Only got " + cellNumber + " cells");
205          Thread.sleep(SLEEP_TIME);
206        } else {
207          break;
208        }
209      }
210      if (i == NB_RETRIES - 1) {
211        fail("Waited too much time for normal batch replication");
212      }
213    }
214
215    try {
216      // Disabling replication and modifying the particular version of the cell to validate the
217      // feature.
218      hbaseAdmin.disableReplicationPeer(PEER_ID);
219      Put put2 = new Put(Bytes.toBytes("r1"));
220      put2.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v99"));
221      htable2.put(put2);
222
223      scan = new Scan();
224      scan.readVersions(100);
225      scanner1 = htable2.getScanner(scan);
226      res1 = scanner1.next(NB_ROWS_IN_BATCH);
227      scanner1.close();
228      assertEquals(1, res1.length);
229      assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
230
231      String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() };
232      TestVerifyReplication.runVerifyReplication(args, 0, 1);
233    } finally {
234      hbaseAdmin.enableReplicationPeer(PEER_ID);
235    }
236  }
237
238  @Test
239  public void testVerifyReplicationPrefixFiltering() throws Exception {
240    final byte[] prefixRow = Bytes.toBytes("prefixrow");
241    final byte[] prefixRow2 = Bytes.toBytes("secondrow");
242    loadData("prefixrow", prefixRow);
243    loadData("secondrow", prefixRow2);
244    loadData("aaa", row);
245    loadData("zzz", row);
246    waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4);
247    String[] args =
248      new String[] { "--row-prefixes=prefixrow,secondrow", PEER_ID, tableName.getNameAsString() };
249    TestVerifyReplication.runVerifyReplication(args, NB_ROWS_IN_BATCH * 2, 0);
250  }
251
252  @Test
253  public void testVerifyReplicationSnapshotArguments() {
254    String[] args =
255      new String[] { "--sourceSnapshotName=snapshot1", "2", tableName.getNameAsString() };
256    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
257
258    args = new String[] { "--sourceSnapshotTmpDir=tmp", "2", tableName.getNameAsString() };
259    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
260
261    args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=tmp", "2",
262      tableName.getNameAsString() };
263    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
264
265    args = new String[] { "--peerSnapshotName=snapshot1", "2", tableName.getNameAsString() };
266    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
267
268    args = new String[] { "--peerSnapshotTmpDir=/tmp/", "2", tableName.getNameAsString() };
269    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
270
271    args = new String[] { "--peerSnapshotName=snapshot1", "--peerSnapshotTmpDir=/tmp/",
272      "--peerFSAddress=tempfs", "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2",
273      tableName.getNameAsString() };
274    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
275
276    args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=/tmp/",
277      "--peerSnapshotName=snapshot2", "--peerSnapshotTmpDir=/tmp/", "--peerFSAddress=tempfs",
278      "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", tableName.getNameAsString() };
279
280    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
281  }
282
283  @Test
284  public void testVerifyReplicationWithSnapshotSupport() throws Exception {
285    // Populate the tables, at the same time it guarantees that the tables are
286    // identical since it does the check
287    runSmallBatchTest();
288
289    // Take source and target tables snapshot
290    Path rootDir = CommonFSUtils.getRootDir(CONF1);
291    FileSystem fs = rootDir.getFileSystem(CONF1);
292    String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
293    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
294      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
295
296    // Take target snapshot
297    Path peerRootDir = CommonFSUtils.getRootDir(CONF2);
298    FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
299    String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
300    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
301      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
302
303    String peerFSAddress = peerFs.getUri().toString();
304    String temPath1 = UTIL1.getRandomDir().toString();
305    String temPath2 = "/tmp" + EnvironmentEdgeManager.currentTime();
306
307    String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
308      "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
309      "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
310      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), "2",
311      tableName.getNameAsString() };
312    TestVerifyReplication.runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
313    TestVerifyReplication.checkRestoreTmpDir(CONF1, temPath1, 1);
314    TestVerifyReplication.checkRestoreTmpDir(CONF2, temPath2, 1);
315
316    Scan scan = new Scan();
317    ResultScanner rs = htable2.getScanner(scan);
318    Put put = null;
319    for (Result result : rs) {
320      put = new Put(result.getRow());
321      Cell firstVal = result.rawCells()[0];
322      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
323        Bytes.toBytes("diff data"));
324      htable2.put(put);
325    }
326    Delete delete = new Delete(put.getRow());
327    htable2.delete(delete);
328
329    sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
330    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
331      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
332
333    peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
334    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
335      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
336
337    args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
338      "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
339      "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
340      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), "2",
341      tableName.getNameAsString() };
342    TestVerifyReplication.runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
343    TestVerifyReplication.checkRestoreTmpDir(CONF1, temPath1, 2);
344    TestVerifyReplication.checkRestoreTmpDir(CONF2, temPath2, 2);
345  }
346
347  @AfterClass
348  public static void tearDownAfterClass() throws Exception {
349    htable3.close();
350    TestReplicationBase.tearDownAfterClass();
351  }
352}