001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.stream.Collectors; 026import java.util.stream.IntStream; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Admin; 035import org.apache.hadoop.hbase.client.Get; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.RegionInfoBuilder; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 041import org.apache.hadoop.hbase.master.HMaster; 042import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 043import org.apache.hadoop.hbase.master.replication.RecoverStandbyProcedure; 044import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager; 045import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 046import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 047import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; 048import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 049import org.apache.hadoop.hbase.replication.ReplicationUtils; 050import org.apache.hadoop.hbase.testclassification.LargeTests; 051import org.apache.hadoop.hbase.testclassification.MasterTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 055import org.apache.hadoop.hbase.wal.WAL.Entry; 056import org.apache.hadoop.hbase.wal.WALEdit; 057import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 058import org.apache.hadoop.hbase.wal.WALKeyImpl; 059import org.junit.After; 060import org.junit.AfterClass; 061import org.junit.Before; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069@Category({ MasterTests.class, LargeTests.class }) 070public class TestRecoverStandbyProcedure { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestRecoverStandbyProcedure.class); 075 076 private static final Logger LOG = LoggerFactory.getLogger(TestRecoverStandbyProcedure.class); 077 078 private static final TableName tableName = TableName.valueOf("TestRecoverStandbyProcedure"); 079 080 private static final RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build(); 081 082 private static final byte[] family = Bytes.toBytes("CF"); 083 084 private static final byte[] qualifier = Bytes.toBytes("q"); 085 086 private static final long timestamp = EnvironmentEdgeManager.currentTime(); 087 088 private static final int ROW_COUNT = 1000; 089 090 private static final int WAL_NUMBER = 10; 091 092 private static final int RS_NUMBER = 3; 093 094 private static final String PEER_ID = "1"; 095 096 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 097 098 private static SyncReplicationReplayWALManager syncReplicationReplayWALManager; 099 100 private static ProcedureExecutor<MasterProcedureEnv> procExec; 101 102 private static FileSystem fs; 103 104 private static Configuration conf; 105 106 @BeforeClass 107 public static void setupCluster() throws Exception { 108 UTIL.startMiniCluster(RS_NUMBER); 109 UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); 110 conf = UTIL.getConfiguration(); 111 HMaster master = UTIL.getHBaseCluster().getMaster(); 112 fs = master.getMasterFileSystem().getWALFileSystem(); 113 syncReplicationReplayWALManager = master.getSyncReplicationReplayWALManager(); 114 procExec = master.getMasterProcedureExecutor(); 115 } 116 117 @AfterClass 118 public static void cleanupTest() throws Exception { 119 try { 120 UTIL.shutdownMiniCluster(); 121 } catch (Exception e) { 122 LOG.warn("failure shutting down cluster", e); 123 } 124 } 125 126 @Before 127 public void setupBeforeTest() throws IOException { 128 UTIL.createTable(tableName, family); 129 } 130 131 @After 132 public void tearDownAfterTest() throws IOException { 133 try (Admin admin = UTIL.getAdmin()) { 134 if (admin.isTableEnabled(tableName)) { 135 admin.disableTable(tableName); 136 } 137 admin.deleteTable(tableName); 138 } 139 } 140 141 @Test 142 public void testRecoverStandby() throws IOException, StreamLacksCapabilityException { 143 setupSyncReplicationWALs(); 144 long procId = procExec.submitProcedure(new RecoverStandbyProcedure(PEER_ID, false)); 145 ProcedureTestingUtility.waitProcedure(procExec, procId); 146 ProcedureTestingUtility.assertProcNotFailed(procExec, procId); 147 148 try (Table table = UTIL.getConnection().getTable(tableName)) { 149 for (int i = 0; i < WAL_NUMBER * ROW_COUNT; i++) { 150 Result result = table.get(new Get(Bytes.toBytes(i)).setTimestamp(timestamp)); 151 assertNotNull(result); 152 assertEquals(i, Bytes.toInt(result.getValue(family, qualifier))); 153 } 154 } 155 } 156 157 private void setupSyncReplicationWALs() throws IOException, StreamLacksCapabilityException { 158 Path peerRemoteWALDir = ReplicationUtils 159 .getPeerRemoteWALDir(syncReplicationReplayWALManager.getRemoteWALDir(), PEER_ID); 160 if (!fs.exists(peerRemoteWALDir)) { 161 fs.mkdirs(peerRemoteWALDir); 162 } 163 for (int i = 0; i < WAL_NUMBER; i++) { 164 try (ProtobufLogWriter writer = new ProtobufLogWriter()) { 165 Path wal = new Path(peerRemoteWALDir, "srv1,8888." + i + ".syncrep"); 166 writer.init(fs, wal, conf, true, WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir), 167 StreamSlowMonitor.create(conf, "defaultMonitor")); 168 List<Entry> entries = setupWALEntries(i * ROW_COUNT, (i + 1) * ROW_COUNT); 169 for (Entry entry : entries) { 170 writer.append(entry); 171 } 172 writer.sync(false); 173 LOG.info("Created wal {} to replay for peer id={}", wal, PEER_ID); 174 } 175 } 176 } 177 178 private List<Entry> setupWALEntries(int startRow, int endRow) { 179 return IntStream.range(startRow, endRow) 180 .mapToObj(i -> createWALEntry(Bytes.toBytes(i), Bytes.toBytes(i))) 181 .collect(Collectors.toList()); 182 } 183 184 private Entry createWALEntry(byte[] row, byte[] value) { 185 WALKeyImpl key = new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, 1); 186 WALEdit edit = new WALEdit(); 187 WALEditInternalHelper.addExtendedCell(edit, 188 new KeyValue(row, family, qualifier, timestamp, value)); 189 return new Entry(key, edit); 190 } 191}