001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.master;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022
023import java.io.IOException;
024import java.util.List;
025import java.util.stream.Collectors;
026import java.util.stream.IntStream;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.Get;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.RegionInfoBuilder;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
041import org.apache.hadoop.hbase.master.HMaster;
042import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
043import org.apache.hadoop.hbase.master.replication.RecoverStandbyProcedure;
044import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
045import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
046import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
047import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
048import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
049import org.apache.hadoop.hbase.replication.ReplicationUtils;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.testclassification.MasterTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.hadoop.hbase.wal.WAL.Entry;
056import org.apache.hadoop.hbase.wal.WALEdit;
057import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
058import org.apache.hadoop.hbase.wal.WALKeyImpl;
059import org.junit.After;
060import org.junit.AfterClass;
061import org.junit.Before;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069@Category({ MasterTests.class, LargeTests.class })
070public class TestRecoverStandbyProcedure {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074    HBaseClassTestRule.forClass(TestRecoverStandbyProcedure.class);
075
076  private static final Logger LOG = LoggerFactory.getLogger(TestRecoverStandbyProcedure.class);
077
078  private static final TableName tableName = TableName.valueOf("TestRecoverStandbyProcedure");
079
080  private static final RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
081
082  private static final byte[] family = Bytes.toBytes("CF");
083
084  private static final byte[] qualifier = Bytes.toBytes("q");
085
086  private static final long timestamp = EnvironmentEdgeManager.currentTime();
087
088  private static final int ROW_COUNT = 1000;
089
090  private static final int WAL_NUMBER = 10;
091
092  private static final int RS_NUMBER = 3;
093
094  private static final String PEER_ID = "1";
095
096  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
097
098  private static SyncReplicationReplayWALManager syncReplicationReplayWALManager;
099
100  private static ProcedureExecutor<MasterProcedureEnv> procExec;
101
102  private static FileSystem fs;
103
104  private static Configuration conf;
105
106  @BeforeClass
107  public static void setupCluster() throws Exception {
108    UTIL.startMiniCluster(RS_NUMBER);
109    UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
110    conf = UTIL.getConfiguration();
111    HMaster master = UTIL.getHBaseCluster().getMaster();
112    fs = master.getMasterFileSystem().getWALFileSystem();
113    syncReplicationReplayWALManager = master.getSyncReplicationReplayWALManager();
114    procExec = master.getMasterProcedureExecutor();
115  }
116
117  @AfterClass
118  public static void cleanupTest() throws Exception {
119    try {
120      UTIL.shutdownMiniCluster();
121    } catch (Exception e) {
122      LOG.warn("failure shutting down cluster", e);
123    }
124  }
125
126  @Before
127  public void setupBeforeTest() throws IOException {
128    UTIL.createTable(tableName, family);
129  }
130
131  @After
132  public void tearDownAfterTest() throws IOException {
133    try (Admin admin = UTIL.getAdmin()) {
134      if (admin.isTableEnabled(tableName)) {
135        admin.disableTable(tableName);
136      }
137      admin.deleteTable(tableName);
138    }
139  }
140
141  @Test
142  public void testRecoverStandby() throws IOException, StreamLacksCapabilityException {
143    setupSyncReplicationWALs();
144    long procId = procExec.submitProcedure(new RecoverStandbyProcedure(PEER_ID, false));
145    ProcedureTestingUtility.waitProcedure(procExec, procId);
146    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
147
148    try (Table table = UTIL.getConnection().getTable(tableName)) {
149      for (int i = 0; i < WAL_NUMBER * ROW_COUNT; i++) {
150        Result result = table.get(new Get(Bytes.toBytes(i)).setTimestamp(timestamp));
151        assertNotNull(result);
152        assertEquals(i, Bytes.toInt(result.getValue(family, qualifier)));
153      }
154    }
155  }
156
157  private void setupSyncReplicationWALs() throws IOException, StreamLacksCapabilityException {
158    Path peerRemoteWALDir = ReplicationUtils
159      .getPeerRemoteWALDir(syncReplicationReplayWALManager.getRemoteWALDir(), PEER_ID);
160    if (!fs.exists(peerRemoteWALDir)) {
161      fs.mkdirs(peerRemoteWALDir);
162    }
163    for (int i = 0; i < WAL_NUMBER; i++) {
164      try (ProtobufLogWriter writer = new ProtobufLogWriter()) {
165        Path wal = new Path(peerRemoteWALDir, "srv1,8888." + i + ".syncrep");
166        writer.init(fs, wal, conf, true, WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir),
167          StreamSlowMonitor.create(conf, "defaultMonitor"));
168        List<Entry> entries = setupWALEntries(i * ROW_COUNT, (i + 1) * ROW_COUNT);
169        for (Entry entry : entries) {
170          writer.append(entry);
171        }
172        writer.sync(false);
173        LOG.info("Created wal {} to replay for peer id={}", wal, PEER_ID);
174      }
175    }
176  }
177
178  private List<Entry> setupWALEntries(int startRow, int endRow) {
179    return IntStream.range(startRow, endRow)
180      .mapToObj(i -> createWALEntry(Bytes.toBytes(i), Bytes.toBytes(i)))
181      .collect(Collectors.toList());
182  }
183
184  private Entry createWALEntry(byte[] row, byte[] value) {
185    WALKeyImpl key = new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, 1);
186    WALEdit edit = new WALEdit();
187    WALEditInternalHelper.addExtendedCell(edit,
188      new KeyValue(row, family, qualifier, timestamp, value));
189    return new Entry(key, edit);
190  }
191}