001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.NavigableMap;
024import java.util.TreeMap;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.KeyValue;
029import org.apache.hadoop.hbase.Waiter;
030import org.apache.hadoop.hbase.client.RegionInfo;
031import org.apache.hadoop.hbase.client.RegionInfoBuilder;
032import org.apache.hadoop.hbase.regionserver.HRegionServer;
033import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
035import org.apache.hadoop.hbase.replication.regionserver.Replication;
036import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.testclassification.ReplicationTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
042import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
043import org.apache.hadoop.hbase.wal.WAL;
044import org.apache.hadoop.hbase.wal.WALEdit;
045import org.apache.hadoop.hbase.wal.WALKeyImpl;
046import org.junit.Assert;
047import org.junit.Before;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051
052@Category({ ReplicationTests.class, LargeTests.class })
053public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
054  MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
055  static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
056  NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060    HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
061
062  @Before
063  public void setUp() throws IOException, InterruptedException {
064    cleanUp();
065    scopes.put(famName, HConstants.REPLICATION_SCOPE_GLOBAL);
066    replicateCount.set(0);
067    replicatedEntries.clear();
068  }
069
070  /**
071   * Waits until there is only one log(the current writing one) in the replication queue
072   * @param numRs number of region servers
073   */
074  private void waitForLogAdvance(int numRs) {
075    Waiter.waitFor(CONF1, 100000, new Waiter.Predicate<Exception>() {
076      @Override
077      public boolean evaluate() throws Exception {
078        for (int i = 0; i < numRs; i++) {
079          HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
080          RegionInfo regionInfo =
081            UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
082          WAL wal = hrs.getWAL(regionInfo);
083          Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
084          Replication replicationService =
085            (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
086          for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
087            .getSources()) {
088            ReplicationSource source = (ReplicationSource) rsi;
089            // We are making sure that there is only one log queue and that is for the
090            // current WAL of region server
091            String logPrefix = source.getQueues().keySet().stream().findFirst().get();
092            if (
093              !currentFile.equals(source.getCurrentPath())
094                || source.getQueues().keySet().size() != 1
095                || source.getQueues().get(logPrefix).size() != 1
096            ) {
097              return false;
098            }
099          }
100        }
101        return true;
102      }
103    });
104  }
105
106  private void verifyNumberOfLogsInQueue(int numQueues, int numRs) {
107    Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
108      @Override
109      public boolean evaluate() {
110        for (int i = 0; i < numRs; i++) {
111          Replication replicationService =
112            (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
113          for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
114            .getSources()) {
115            ReplicationSource source = (ReplicationSource) rsi;
116            String logPrefix = source.getQueues().keySet().stream().findFirst().get();
117            if (source.getQueues().get(logPrefix).size() != numQueues) {
118              return false;
119            }
120          }
121        }
122        return true;
123      }
124    });
125  }
126
127  @Test
128  public void testEmptyWALRecovery() throws Exception {
129    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
130    // for each RS, create an empty wal with same walGroupId
131    final List<Path> emptyWalPaths = new ArrayList<>();
132    long ts = EnvironmentEdgeManager.currentTime();
133    for (int i = 0; i < numRs; i++) {
134      RegionInfo regionInfo =
135        UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
136      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
137      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
138      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
139      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
140      UTIL1.getTestFileSystem().create(emptyWalPath).close();
141      emptyWalPaths.add(emptyWalPath);
142    }
143
144    injectEmptyWAL(numRs, emptyWalPaths);
145
146    // ReplicationSource should advance past the empty wal, or else the test will fail
147    waitForLogAdvance(numRs);
148    verifyNumberOfLogsInQueue(1, numRs);
149    // we're now writing to the new wal
150    // if everything works, the source should've stopped reading from the empty wal, and start
151    // replicating from the new wal
152    runSimplePutDeleteTest();
153    rollWalsAndWaitForDeque(numRs);
154  }
155
156  /**
157   * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we
158   * see the empty and handle the EOF exception, we are able to ship the previous batch of entries
159   * without loosing it. This test also tests the number of batches shipped
160   * @throws Exception throws any exception
161   */
162  @Test
163  public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception {
164    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
165    // make sure we only the current active wal file in queue
166    verifyNumberOfLogsInQueue(1, numRs);
167
168    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
169    hbaseAdmin.disableReplicationPeer(PEER_ID2);
170
171    int numOfEntriesToReplicate = 20;
172    // for each RS, create an empty wal with same walGroupId
173    final List<Path> emptyWalPaths = new ArrayList<>();
174    long ts = EnvironmentEdgeManager.currentTime();
175    for (int i = 0; i < numRs; i++) {
176      RegionInfo regionInfo =
177        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
178      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
179      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
180
181      appendEntriesToWal(numOfEntriesToReplicate, wal);
182      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
183      Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts);
184      UTIL1.getTestFileSystem().create(emptyWalPath).close();
185      emptyWalPaths.add(emptyWalPath);
186    }
187
188    injectEmptyWAL(numRs, emptyWalPaths);
189    // There should be three WALs in queue
190    // 1. non empty WAL
191    // 2. empty WAL
192    // 3. live WAL
193    verifyNumberOfLogsInQueue(3, numRs);
194    hbaseAdmin.enableReplicationPeer(PEER_ID2);
195    // ReplicationSource should advance past the empty wal, or else the test will fail
196    waitForLogAdvance(numRs);
197
198    // Now we should expect numOfEntriesToReplicate entries
199    // replicated from each region server. This makes sure we didn't loose data
200    // from any previous batch when we encounter EOF exception for empty file.
201    Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
202      replicatedEntries.size());
203
204    // We expect just one batch of replication which will
205    // be from when we handle the EOF exception.
206    Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.intValue());
207    verifyNumberOfLogsInQueue(1, numRs);
208    // we're now writing to the new wal
209    // if everything works, the source should've stopped reading from the empty wal, and start
210    // replicating from the new wal
211    runSimplePutDeleteTest();
212    rollWalsAndWaitForDeque(numRs);
213  }
214
215  /**
216   * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we
217   * see the empty WAL and handle the EOF exception, we are able to proceed with next batch and
218   * replicate it properly without missing data.
219   * @throws Exception throws any exception
220   */
221  @Test
222  public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception {
223    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
224    hbaseAdmin.disableReplicationPeer(PEER_ID2);
225    int numOfEntriesToReplicate = 20;
226
227    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
228    // for each RS, create an empty wal with same walGroupId
229    final List<Path> emptyWalPaths = new ArrayList<>();
230    long ts = EnvironmentEdgeManager.currentTime();
231    WAL wal = null;
232    for (int i = 0; i < numRs; i++) {
233      RegionInfo regionInfo =
234        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
235      wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
236      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
237      appendEntriesToWal(numOfEntriesToReplicate, wal);
238      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
239      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
240      UTIL1.getTestFileSystem().create(emptyWalPath).close();
241      emptyWalPaths.add(emptyWalPath);
242
243    }
244    injectEmptyWAL(numRs, emptyWalPaths);
245    // roll the WAL now
246    for (int i = 0; i < numRs; i++) {
247      wal.rollWriter();
248    }
249    hbaseAdmin.enableReplicationPeer(PEER_ID2);
250    // ReplicationSource should advance past the empty wal, or else the test will fail
251    waitForLogAdvance(numRs);
252
253    // Now we should expect numOfEntriesToReplicate entries
254    // replicated from each region server. This makes sure we didn't loose data
255    // from any previous batch when we encounter EOF exception for empty file.
256    Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
257      replicatedEntries.size());
258
259    // We expect just one batch of replication to be shipped which will
260    // for non empty WAL
261    Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.get());
262    verifyNumberOfLogsInQueue(1, numRs);
263    // we're now writing to the new wal
264    // if everything works, the source should've stopped reading from the empty wal, and start
265    // replicating from the new wal
266    runSimplePutDeleteTest();
267    rollWalsAndWaitForDeque(numRs);
268  }
269
270  /**
271   * This test make sure we replicate all the enties from the non empty WALs which are surrounding
272   * the empty WALs
273   * @throws Exception throws exception
274   */
275  @Test
276  public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception {
277    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
278    hbaseAdmin.disableReplicationPeer(PEER_ID2);
279    int numOfEntriesToReplicate = 20;
280
281    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
282    // for each RS, create an empty wal with same walGroupId
283    final List<Path> emptyWalPaths = new ArrayList<>();
284    long ts = EnvironmentEdgeManager.currentTime();
285    WAL wal = null;
286    for (int i = 0; i < numRs; i++) {
287      RegionInfo regionInfo =
288        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
289      wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
290      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
291      appendEntriesToWal(numOfEntriesToReplicate, wal);
292      wal.rollWriter();
293      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
294      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
295      UTIL1.getTestFileSystem().create(emptyWalPath).close();
296      emptyWalPaths.add(emptyWalPath);
297    }
298    injectEmptyWAL(numRs, emptyWalPaths);
299
300    // roll the WAL again with some entries
301    for (int i = 0; i < numRs; i++) {
302      appendEntriesToWal(numOfEntriesToReplicate, wal);
303      wal.rollWriter();
304    }
305
306    hbaseAdmin.enableReplicationPeer(PEER_ID2);
307    // ReplicationSource should advance past the empty wal, or else the test will fail
308    waitForLogAdvance(numRs);
309
310    // Now we should expect numOfEntriesToReplicate entries
311    // replicated from each region server. This makes sure we didn't loose data
312    // from any previous batch when we encounter EOF exception for empty file.
313    Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs * 2,
314      replicatedEntries.size());
315
316    // We expect two batch of replication to be shipped which will
317    // for non empty WAL
318    Assert.assertEquals("Replicated batches are not correct", 2, replicateCount.get());
319    verifyNumberOfLogsInQueue(1, numRs);
320    // we're now writing to the new wal
321    // if everything works, the source should've stopped reading from the empty wal, and start
322    // replicating from the new wal
323    runSimplePutDeleteTest();
324    rollWalsAndWaitForDeque(numRs);
325  }
326
327  // inject our empty wal into the replication queue, and then roll the original wal, which
328  // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
329  // determine if the file being replicated currently is still opened for write, so just inject a
330  // new wal to the replication queue does not mean the previous file is closed.
331  private void injectEmptyWAL(int numRs, List<Path> emptyWalPaths) throws IOException {
332    for (int i = 0; i < numRs; i++) {
333      HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
334      Replication replicationService = (Replication) hrs.getReplicationSourceService();
335      replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i));
336      replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i));
337      RegionInfo regionInfo =
338        UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
339      WAL wal = hrs.getWAL(regionInfo);
340      wal.rollWriter(true);
341    }
342  }
343
344  protected WALKeyImpl getWalKeyImpl() {
345    return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0, mvcc, scopes);
346  }
347
348  // Roll the WAL and wait for it to get deque from the log queue
349  private void rollWalsAndWaitForDeque(int numRs) throws IOException {
350    RegionInfo regionInfo =
351      UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
352    for (int i = 0; i < numRs; i++) {
353      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
354      wal.rollWriter();
355    }
356    waitForLogAdvance(numRs);
357  }
358
359  private void appendEntriesToWal(int numEntries, WAL wal) throws IOException {
360    long txId = -1;
361    for (int i = 0; i < numEntries; i++) {
362      byte[] b = Bytes.toBytes(Integer.toString(i));
363      KeyValue kv = new KeyValue(b, famName, b);
364      WALEdit edit = new WALEdit();
365      edit.add(kv);
366      txId = wal.appendData(info, getWalKeyImpl(), edit);
367    }
368    wal.sync(txId);
369  }
370}