001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertTrue; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Set; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 037import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 038import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 039import org.apache.hadoop.hbase.replication.ReplicationQueueId; 040import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 041import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 042import org.apache.hadoop.hbase.testclassification.ReplicationTests; 043import org.apache.hadoop.hbase.testclassification.SmallTests; 044import org.apache.hadoop.hbase.util.CommonFSUtils; 045import org.apache.hadoop.hbase.util.Pair; 046import org.junit.After; 047import org.junit.Before; 048import org.junit.ClassRule; 049import org.junit.Rule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.junit.rules.TestName; 053 054/** 055 * Tests for DumpReplicationQueues tool 056 */ 057@Category({ ReplicationTests.class, SmallTests.class }) 058public class TestDumpReplicationQueues { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestDumpReplicationQueues.class); 063 064 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 065 private static Configuration CONF; 066 private static FileSystem FS = null; 067 private Path root; 068 private Path logDir; 069 @Rule 070 public final TestName name = new TestName(); 071 072 @Before 073 public void setup() throws Exception { 074 UTIL.startMiniCluster(3); 075 CONF = UTIL.getConfiguration(); 076 TableName tableName = TableName.valueOf("replication_" + name.getMethodName()); 077 UTIL.getAdmin() 078 .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName)); 079 CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString()); 080 FS = FileSystem.get(CONF); 081 root = UTIL.getDataTestDirOnTestFS("hbase"); 082 logDir = new Path(root, HConstants.HREGION_LOGDIR_NAME); 083 FS.mkdirs(logDir); 084 CommonFSUtils.setRootDir(CONF, root); 085 CommonFSUtils.setWALRootDir(CONF, root); 086 } 087 088 @Test 089 public void testDumpReplication() throws Exception { 090 String peerId = "1"; 091 String serverNameStr = "rs1,12345,123"; 092 addPeer(peerId, "hbase"); 093 ServerName serverName = ServerName.valueOf(serverNameStr); 094 String walName = "rs1%2C12345%2C123.10"; 095 Path walPath = new Path(logDir, serverNameStr + "/" + walName); 096 FS.createNewFile(walPath); 097 098 ReplicationQueueId queueId = new ReplicationQueueId(serverName, peerId); 099 ReplicationQueueStorage queueStorage = 100 ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(), CONF); 101 queueStorage.setOffset(queueId, "wal-group", 102 new ReplicationGroupOffset(FS.listStatus(walPath)[0].getPath().toString(), 123), 103 Collections.emptyMap()); 104 105 DumpReplicationQueues dumpQueues = new DumpReplicationQueues(); 106 Set<String> peerIds = new HashSet<>(); 107 peerIds.add(peerId); 108 List<String> wals = new ArrayList<>(); 109 wals.add("rs1%2C12345%2C123.12"); 110 wals.add("rs1%2C12345%2C123.15"); 111 wals.add("rs1%2C12345%2C123.11"); 112 for (String wal : wals) { 113 Path wPath = new Path(logDir, serverNameStr + "/" + wal); 114 FS.createNewFile(wPath); 115 } 116 117 String dump = dumpQueues.dumpQueues(UTIL.getConnection(), peerIds, false, CONF); 118 assertTrue(dump.indexOf("Queue id: 1-rs1,12345,123") > 0); 119 assertTrue(dump.indexOf("Number of WALs in replication queue: 4") > 0); 120 // test for 'Returns wal sorted' 121 String[] parsedDump = dump.split("Replication position for"); 122 assertTrue("First wal should be rs1%2C12345%2C123.10: 123, but got: " + parsedDump[1], 123 parsedDump[1].indexOf("rs1%2C12345%2C123.10: 123") >= 0); 124 assertTrue("Second wal should be rs1%2C12345%2C123.11: 0, but got: " + parsedDump[2], 125 parsedDump[2].indexOf("rs1%2C12345%2C123.11: 0 (not started or nothing to replicate)") >= 0); 126 assertTrue("Third wal should be rs1%2C12345%2C123.12: 0, but got: " + parsedDump[3], 127 parsedDump[3].indexOf("rs1%2C12345%2C123.12: 0 (not started or nothing to replicate)") >= 0); 128 assertTrue("Fourth wal should be rs1%2C12345%2C123.15: 0, but got: " + parsedDump[4], 129 parsedDump[4].indexOf("rs1%2C12345%2C123.15: 0 (not started or nothing to replicate)") >= 0); 130 131 Path file1 = new Path("testHFile1"); 132 Path file2 = new Path("testHFile2"); 133 List<Pair<Path, Path>> files = new ArrayList<>(1); 134 files.add(new Pair<>(null, file1)); 135 files.add(new Pair<>(null, file2)); 136 queueStorage.addHFileRefs(peerId, files); 137 // test for 'Dump Replication via replication table' 138 String dump2 = dumpQueues.dumpReplicationViaTable(UTIL.getConnection(), CONF); 139 assertTrue(dump2.indexOf("peers/1/peer-state: ENABLED") > 0); 140 assertTrue(dump2.indexOf("rs1,12345,123/rs1%2C12345%2C123.10: 123") >= 0); 141 assertTrue(dump2.indexOf("hfile-refs/1/testHFile1,testHFile2") >= 0); 142 } 143 144 /** 145 * Add a peer 146 */ 147 private void addPeer(String peerId, String clusterKey) throws IOException { 148 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() 149 .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey) 150 .setReplicationEndpointImpl( 151 TestReplicationSourceManager.ReplicationEndpointForTest.class.getName()); 152 UTIL.getAdmin().addReplicationPeer(peerId, builder.build(), true); 153 } 154 155 @After 156 public void tearDown() throws Exception { 157 UTIL.shutdownMiniCluster(); 158 } 159}