001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.NavigableMap;
024import java.util.TreeMap;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.KeyValue;
029import org.apache.hadoop.hbase.Waiter;
030import org.apache.hadoop.hbase.client.RegionInfo;
031import org.apache.hadoop.hbase.client.RegionInfoBuilder;
032import org.apache.hadoop.hbase.regionserver.HRegionServer;
033import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
035import org.apache.hadoop.hbase.replication.regionserver.Replication;
036import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.testclassification.ReplicationTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
042import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
043import org.apache.hadoop.hbase.wal.WAL;
044import org.apache.hadoop.hbase.wal.WALEdit;
045import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
046import org.apache.hadoop.hbase.wal.WALKeyImpl;
047import org.junit.Assert;
048import org.junit.Before;
049import org.junit.ClassRule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052
053@Category({ ReplicationTests.class, LargeTests.class })
054public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
055  MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
056  static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
057  NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
058
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061    HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
062
063  @Before
064  public void setUp() throws IOException, InterruptedException {
065    cleanUp();
066    scopes.put(famName, HConstants.REPLICATION_SCOPE_GLOBAL);
067    replicateCount.set(0);
068    replicatedEntries.clear();
069  }
070
071  /**
072   * Waits until there is only one log(the current writing one) in the replication queue
073   * @param numRs number of region servers
074   */
075  private void waitForLogAdvance(int numRs) {
076    Waiter.waitFor(CONF1, 100000, new Waiter.Predicate<Exception>() {
077      @Override
078      public boolean evaluate() throws Exception {
079        for (int i = 0; i < numRs; i++) {
080          HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
081          RegionInfo regionInfo =
082            UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
083          WAL wal = hrs.getWAL(regionInfo);
084          Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
085          Replication replicationService =
086            (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
087          for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
088            .getSources()) {
089            ReplicationSource source = (ReplicationSource) rsi;
090            // We are making sure that there is only one log queue and that is for the
091            // current WAL of region server
092            String logPrefix = source.getQueues().keySet().stream().findFirst().get();
093            if (
094              !currentFile.equals(source.getCurrentPath())
095                || source.getQueues().keySet().size() != 1
096                || source.getQueues().get(logPrefix).size() != 1
097            ) {
098              return false;
099            }
100          }
101        }
102        return true;
103      }
104    });
105  }
106
107  private void verifyNumberOfLogsInQueue(int numQueues, int numRs) {
108    Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
109      @Override
110      public boolean evaluate() {
111        for (int i = 0; i < numRs; i++) {
112          Replication replicationService =
113            (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
114          for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
115            .getSources()) {
116            ReplicationSource source = (ReplicationSource) rsi;
117            String logPrefix = source.getQueues().keySet().stream().findFirst().get();
118            if (source.getQueues().get(logPrefix).size() != numQueues) {
119              return false;
120            }
121          }
122        }
123        return true;
124      }
125    });
126  }
127
128  @Test
129  public void testEmptyWALRecovery() throws Exception {
130    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
131    // for each RS, create an empty wal with same walGroupId
132    final List<Path> emptyWalPaths = new ArrayList<>();
133    long ts = EnvironmentEdgeManager.currentTime();
134    for (int i = 0; i < numRs; i++) {
135      RegionInfo regionInfo =
136        UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
137      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
138      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
139      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
140      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
141      UTIL1.getTestFileSystem().create(emptyWalPath).close();
142      emptyWalPaths.add(emptyWalPath);
143    }
144
145    injectEmptyWAL(numRs, emptyWalPaths);
146
147    // ReplicationSource should advance past the empty wal, or else the test will fail
148    waitForLogAdvance(numRs);
149    verifyNumberOfLogsInQueue(1, numRs);
150    // we're now writing to the new wal
151    // if everything works, the source should've stopped reading from the empty wal, and start
152    // replicating from the new wal
153    runSimplePutDeleteTest();
154    rollWalsAndWaitForDeque(numRs);
155  }
156
157  /**
158   * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we
159   * see the empty and handle the EOF exception, we are able to ship the previous batch of entries
160   * without loosing it. This test also tests the number of batches shipped
161   * @throws Exception throws any exception
162   */
163  @Test
164  public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception {
165    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
166    // make sure we only the current active wal file in queue
167    verifyNumberOfLogsInQueue(1, numRs);
168
169    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
170    hbaseAdmin.disableReplicationPeer(PEER_ID2);
171
172    int numOfEntriesToReplicate = 20;
173    // for each RS, create an empty wal with same walGroupId
174    final List<Path> emptyWalPaths = new ArrayList<>();
175    long ts = EnvironmentEdgeManager.currentTime();
176    for (int i = 0; i < numRs; i++) {
177      RegionInfo regionInfo =
178        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
179      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
180      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
181
182      appendEntriesToWal(numOfEntriesToReplicate, wal);
183      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
184      Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts);
185      UTIL1.getTestFileSystem().create(emptyWalPath).close();
186      emptyWalPaths.add(emptyWalPath);
187    }
188
189    injectEmptyWAL(numRs, emptyWalPaths);
190    // There should be three WALs in queue
191    // 1. non empty WAL
192    // 2. empty WAL
193    // 3. live WAL
194    verifyNumberOfLogsInQueue(3, numRs);
195    hbaseAdmin.enableReplicationPeer(PEER_ID2);
196    // ReplicationSource should advance past the empty wal, or else the test will fail
197    waitForLogAdvance(numRs);
198
199    // Now we should expect numOfEntriesToReplicate entries
200    // replicated from each region server. This makes sure we didn't loose data
201    // from any previous batch when we encounter EOF exception for empty file.
202    Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
203      replicatedEntries.size());
204
205    // We expect just one batch of replication which will
206    // be from when we handle the EOF exception.
207    Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.intValue());
208    verifyNumberOfLogsInQueue(1, numRs);
209    // we're now writing to the new wal
210    // if everything works, the source should've stopped reading from the empty wal, and start
211    // replicating from the new wal
212    runSimplePutDeleteTest();
213    rollWalsAndWaitForDeque(numRs);
214  }
215
216  /**
217   * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we
218   * see the empty WAL and handle the EOF exception, we are able to proceed with next batch and
219   * replicate it properly without missing data.
220   * @throws Exception throws any exception
221   */
222  @Test
223  public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception {
224    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
225    hbaseAdmin.disableReplicationPeer(PEER_ID2);
226    int numOfEntriesToReplicate = 20;
227
228    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
229    // for each RS, create an empty wal with same walGroupId
230    final List<Path> emptyWalPaths = new ArrayList<>();
231    long ts = EnvironmentEdgeManager.currentTime();
232    WAL wal = null;
233    for (int i = 0; i < numRs; i++) {
234      RegionInfo regionInfo =
235        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
236      wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
237      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
238      appendEntriesToWal(numOfEntriesToReplicate, wal);
239      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
240      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
241      UTIL1.getTestFileSystem().create(emptyWalPath).close();
242      emptyWalPaths.add(emptyWalPath);
243
244    }
245    injectEmptyWAL(numRs, emptyWalPaths);
246    // roll the WAL now
247    for (int i = 0; i < numRs; i++) {
248      wal.rollWriter();
249    }
250    hbaseAdmin.enableReplicationPeer(PEER_ID2);
251    // ReplicationSource should advance past the empty wal, or else the test will fail
252    waitForLogAdvance(numRs);
253
254    // Now we should expect numOfEntriesToReplicate entries
255    // replicated from each region server. This makes sure we didn't loose data
256    // from any previous batch when we encounter EOF exception for empty file.
257    Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
258      replicatedEntries.size());
259
260    // We expect just one batch of replication to be shipped which will
261    // for non empty WAL
262    Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.get());
263    verifyNumberOfLogsInQueue(1, numRs);
264    // we're now writing to the new wal
265    // if everything works, the source should've stopped reading from the empty wal, and start
266    // replicating from the new wal
267    runSimplePutDeleteTest();
268    rollWalsAndWaitForDeque(numRs);
269  }
270
271  /**
272   * This test make sure we replicate all the enties from the non empty WALs which are surrounding
273   * the empty WALs
274   * @throws Exception throws exception
275   */
276  @Test
277  public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception {
278    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
279    hbaseAdmin.disableReplicationPeer(PEER_ID2);
280    int numOfEntriesToReplicate = 20;
281
282    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
283    // for each RS, create an empty wal with same walGroupId
284    final List<Path> emptyWalPaths = new ArrayList<>();
285    long ts = EnvironmentEdgeManager.currentTime();
286    WAL wal = null;
287    for (int i = 0; i < numRs; i++) {
288      RegionInfo regionInfo =
289        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
290      wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
291      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
292      appendEntriesToWal(numOfEntriesToReplicate, wal);
293      wal.rollWriter();
294      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
295      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
296      UTIL1.getTestFileSystem().create(emptyWalPath).close();
297      emptyWalPaths.add(emptyWalPath);
298    }
299    injectEmptyWAL(numRs, emptyWalPaths);
300
301    // roll the WAL again with some entries
302    for (int i = 0; i < numRs; i++) {
303      appendEntriesToWal(numOfEntriesToReplicate, wal);
304      wal.rollWriter();
305    }
306
307    hbaseAdmin.enableReplicationPeer(PEER_ID2);
308    // ReplicationSource should advance past the empty wal, or else the test will fail
309    waitForLogAdvance(numRs);
310
311    // Now we should expect numOfEntriesToReplicate entries
312    // replicated from each region server. This makes sure we didn't loose data
313    // from any previous batch when we encounter EOF exception for empty file.
314    Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs * 2,
315      replicatedEntries.size());
316
317    // We expect two batch of replication to be shipped which will
318    // for non empty WAL
319    Assert.assertEquals("Replicated batches are not correct", 2, replicateCount.get());
320    verifyNumberOfLogsInQueue(1, numRs);
321    // we're now writing to the new wal
322    // if everything works, the source should've stopped reading from the empty wal, and start
323    // replicating from the new wal
324    runSimplePutDeleteTest();
325    rollWalsAndWaitForDeque(numRs);
326  }
327
328  // inject our empty wal into the replication queue, and then roll the original wal, which
329  // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
330  // determine if the file being replicated currently is still opened for write, so just inject a
331  // new wal to the replication queue does not mean the previous file is closed.
332  private void injectEmptyWAL(int numRs, List<Path> emptyWalPaths) throws IOException {
333    for (int i = 0; i < numRs; i++) {
334      HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
335      Replication replicationService = (Replication) hrs.getReplicationSourceService();
336      replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i));
337      RegionInfo regionInfo =
338        UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
339      WAL wal = hrs.getWAL(regionInfo);
340      wal.rollWriter(true);
341    }
342  }
343
344  protected WALKeyImpl getWalKeyImpl() {
345    return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0, mvcc, scopes);
346  }
347
348  // Roll the WAL and wait for it to get deque from the log queue
349  private void rollWalsAndWaitForDeque(int numRs) throws IOException {
350    RegionInfo regionInfo =
351      UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
352    for (int i = 0; i < numRs; i++) {
353      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
354      wal.rollWriter();
355    }
356    waitForLogAdvance(numRs);
357  }
358
359  private void appendEntriesToWal(int numEntries, WAL wal) throws IOException {
360    long txId = -1;
361    for (int i = 0; i < numEntries; i++) {
362      byte[] b = Bytes.toBytes(Integer.toString(i));
363      KeyValue kv = new KeyValue(b, famName, b);
364      WALEdit edit = new WALEdit();
365      WALEditInternalHelper.addExtendedCell(edit, kv);
366      txId = wal.appendData(info, getWalKeyImpl(), edit);
367    }
368    wal.sync(txId);
369  }
370}