001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.NavigableMap; 024import java.util.TreeMap; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.KeyValue; 029import org.apache.hadoop.hbase.Waiter; 030import org.apache.hadoop.hbase.client.RegionInfo; 031import org.apache.hadoop.hbase.client.RegionInfoBuilder; 032import org.apache.hadoop.hbase.regionserver.HRegionServer; 033import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 035import org.apache.hadoop.hbase.replication.regionserver.Replication; 036import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; 037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.testclassification.ReplicationTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 043import org.apache.hadoop.hbase.wal.WAL; 044import org.apache.hadoop.hbase.wal.WALEdit; 045import org.apache.hadoop.hbase.wal.WALKeyImpl; 046import org.junit.Assert; 047import org.junit.Before; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051 052@Category({ ReplicationTests.class, LargeTests.class }) 053public class TestReplicationEmptyWALRecovery extends TestReplicationBase { 054 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 055 static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 056 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class); 061 062 @Before 063 public void setUp() throws IOException, InterruptedException { 064 cleanUp(); 065 scopes.put(famName, HConstants.REPLICATION_SCOPE_GLOBAL); 066 replicateCount.set(0); 067 replicatedEntries.clear(); 068 } 069 070 /** 071 * Waits until there is only one log(the current writing one) in the replication queue 072 * @param numRs number of region servers 073 */ 074 private void waitForLogAdvance(int numRs) { 075 Waiter.waitFor(CONF1, 100000, new Waiter.Predicate<Exception>() { 076 @Override 077 public boolean evaluate() throws Exception { 078 for (int i = 0; i < numRs; i++) { 079 HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i); 080 RegionInfo regionInfo = 081 UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); 082 WAL wal = hrs.getWAL(regionInfo); 083 Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName(); 084 Replication replicationService = 085 (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); 086 for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() 087 .getSources()) { 088 ReplicationSource source = (ReplicationSource) rsi; 089 // We are making sure that there is only one log queue and that is for the 090 // current WAL of region server 091 String logPrefix = source.getQueues().keySet().stream().findFirst().get(); 092 if ( 093 !currentFile.equals(source.getCurrentPath()) 094 || source.getQueues().keySet().size() != 1 095 || source.getQueues().get(logPrefix).size() != 1 096 ) { 097 return false; 098 } 099 } 100 } 101 return true; 102 } 103 }); 104 } 105 106 private void verifyNumberOfLogsInQueue(int numQueues, int numRs) { 107 Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() { 108 @Override 109 public boolean evaluate() { 110 for (int i = 0; i < numRs; i++) { 111 Replication replicationService = 112 (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); 113 for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() 114 .getSources()) { 115 ReplicationSource source = (ReplicationSource) rsi; 116 String logPrefix = source.getQueues().keySet().stream().findFirst().get(); 117 if (source.getQueues().get(logPrefix).size() != numQueues) { 118 return false; 119 } 120 } 121 } 122 return true; 123 } 124 }); 125 } 126 127 @Test 128 public void testEmptyWALRecovery() throws Exception { 129 final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 130 // for each RS, create an empty wal with same walGroupId 131 final List<Path> emptyWalPaths = new ArrayList<>(); 132 long ts = EnvironmentEdgeManager.currentTime(); 133 for (int i = 0; i < numRs; i++) { 134 RegionInfo regionInfo = 135 UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); 136 WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 137 Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); 138 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); 139 Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); 140 UTIL1.getTestFileSystem().create(emptyWalPath).close(); 141 emptyWalPaths.add(emptyWalPath); 142 } 143 144 injectEmptyWAL(numRs, emptyWalPaths); 145 146 // ReplicationSource should advance past the empty wal, or else the test will fail 147 waitForLogAdvance(numRs); 148 verifyNumberOfLogsInQueue(1, numRs); 149 // we're now writing to the new wal 150 // if everything works, the source should've stopped reading from the empty wal, and start 151 // replicating from the new wal 152 runSimplePutDeleteTest(); 153 rollWalsAndWaitForDeque(numRs); 154 } 155 156 /** 157 * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we 158 * see the empty and handle the EOF exception, we are able to ship the previous batch of entries 159 * without loosing it. This test also tests the number of batches shipped 160 * @throws Exception throws any exception 161 */ 162 @Test 163 public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception { 164 final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 165 // make sure we only the current active wal file in queue 166 verifyNumberOfLogsInQueue(1, numRs); 167 168 // Disable the replication peer to accumulate the non empty WAL followed by empty WAL 169 hbaseAdmin.disableReplicationPeer(PEER_ID2); 170 171 int numOfEntriesToReplicate = 20; 172 // for each RS, create an empty wal with same walGroupId 173 final List<Path> emptyWalPaths = new ArrayList<>(); 174 long ts = EnvironmentEdgeManager.currentTime(); 175 for (int i = 0; i < numRs; i++) { 176 RegionInfo regionInfo = 177 UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); 178 WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 179 Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); 180 181 appendEntriesToWal(numOfEntriesToReplicate, wal); 182 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); 183 Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts); 184 UTIL1.getTestFileSystem().create(emptyWalPath).close(); 185 emptyWalPaths.add(emptyWalPath); 186 } 187 188 injectEmptyWAL(numRs, emptyWalPaths); 189 // There should be three WALs in queue 190 // 1. non empty WAL 191 // 2. empty WAL 192 // 3. live WAL 193 verifyNumberOfLogsInQueue(3, numRs); 194 hbaseAdmin.enableReplicationPeer(PEER_ID2); 195 // ReplicationSource should advance past the empty wal, or else the test will fail 196 waitForLogAdvance(numRs); 197 198 // Now we should expect numOfEntriesToReplicate entries 199 // replicated from each region server. This makes sure we didn't loose data 200 // from any previous batch when we encounter EOF exception for empty file. 201 Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs, 202 replicatedEntries.size()); 203 204 // We expect just one batch of replication which will 205 // be from when we handle the EOF exception. 206 Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.intValue()); 207 verifyNumberOfLogsInQueue(1, numRs); 208 // we're now writing to the new wal 209 // if everything works, the source should've stopped reading from the empty wal, and start 210 // replicating from the new wal 211 runSimplePutDeleteTest(); 212 rollWalsAndWaitForDeque(numRs); 213 } 214 215 /** 216 * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we 217 * see the empty WAL and handle the EOF exception, we are able to proceed with next batch and 218 * replicate it properly without missing data. 219 * @throws Exception throws any exception 220 */ 221 @Test 222 public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception { 223 // Disable the replication peer to accumulate the non empty WAL followed by empty WAL 224 hbaseAdmin.disableReplicationPeer(PEER_ID2); 225 int numOfEntriesToReplicate = 20; 226 227 final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 228 // for each RS, create an empty wal with same walGroupId 229 final List<Path> emptyWalPaths = new ArrayList<>(); 230 long ts = EnvironmentEdgeManager.currentTime(); 231 WAL wal = null; 232 for (int i = 0; i < numRs; i++) { 233 RegionInfo regionInfo = 234 UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); 235 wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 236 Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); 237 appendEntriesToWal(numOfEntriesToReplicate, wal); 238 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); 239 Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); 240 UTIL1.getTestFileSystem().create(emptyWalPath).close(); 241 emptyWalPaths.add(emptyWalPath); 242 243 } 244 injectEmptyWAL(numRs, emptyWalPaths); 245 // roll the WAL now 246 for (int i = 0; i < numRs; i++) { 247 wal.rollWriter(); 248 } 249 hbaseAdmin.enableReplicationPeer(PEER_ID2); 250 // ReplicationSource should advance past the empty wal, or else the test will fail 251 waitForLogAdvance(numRs); 252 253 // Now we should expect numOfEntriesToReplicate entries 254 // replicated from each region server. This makes sure we didn't loose data 255 // from any previous batch when we encounter EOF exception for empty file. 256 Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs, 257 replicatedEntries.size()); 258 259 // We expect just one batch of replication to be shipped which will 260 // for non empty WAL 261 Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.get()); 262 verifyNumberOfLogsInQueue(1, numRs); 263 // we're now writing to the new wal 264 // if everything works, the source should've stopped reading from the empty wal, and start 265 // replicating from the new wal 266 runSimplePutDeleteTest(); 267 rollWalsAndWaitForDeque(numRs); 268 } 269 270 /** 271 * This test make sure we replicate all the enties from the non empty WALs which are surrounding 272 * the empty WALs 273 * @throws Exception throws exception 274 */ 275 @Test 276 public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception { 277 // Disable the replication peer to accumulate the non empty WAL followed by empty WAL 278 hbaseAdmin.disableReplicationPeer(PEER_ID2); 279 int numOfEntriesToReplicate = 20; 280 281 final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 282 // for each RS, create an empty wal with same walGroupId 283 final List<Path> emptyWalPaths = new ArrayList<>(); 284 long ts = EnvironmentEdgeManager.currentTime(); 285 WAL wal = null; 286 for (int i = 0; i < numRs; i++) { 287 RegionInfo regionInfo = 288 UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); 289 wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 290 Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); 291 appendEntriesToWal(numOfEntriesToReplicate, wal); 292 wal.rollWriter(); 293 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); 294 Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); 295 UTIL1.getTestFileSystem().create(emptyWalPath).close(); 296 emptyWalPaths.add(emptyWalPath); 297 } 298 injectEmptyWAL(numRs, emptyWalPaths); 299 300 // roll the WAL again with some entries 301 for (int i = 0; i < numRs; i++) { 302 appendEntriesToWal(numOfEntriesToReplicate, wal); 303 wal.rollWriter(); 304 } 305 306 hbaseAdmin.enableReplicationPeer(PEER_ID2); 307 // ReplicationSource should advance past the empty wal, or else the test will fail 308 waitForLogAdvance(numRs); 309 310 // Now we should expect numOfEntriesToReplicate entries 311 // replicated from each region server. This makes sure we didn't loose data 312 // from any previous batch when we encounter EOF exception for empty file. 313 Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs * 2, 314 replicatedEntries.size()); 315 316 // We expect two batch of replication to be shipped which will 317 // for non empty WAL 318 Assert.assertEquals("Replicated batches are not correct", 2, replicateCount.get()); 319 verifyNumberOfLogsInQueue(1, numRs); 320 // we're now writing to the new wal 321 // if everything works, the source should've stopped reading from the empty wal, and start 322 // replicating from the new wal 323 runSimplePutDeleteTest(); 324 rollWalsAndWaitForDeque(numRs); 325 } 326 327 // inject our empty wal into the replication queue, and then roll the original wal, which 328 // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to 329 // determine if the file being replicated currently is still opened for write, so just inject a 330 // new wal to the replication queue does not mean the previous file is closed. 331 private void injectEmptyWAL(int numRs, List<Path> emptyWalPaths) throws IOException { 332 for (int i = 0; i < numRs; i++) { 333 HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i); 334 Replication replicationService = (Replication) hrs.getReplicationSourceService(); 335 replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i)); 336 replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i)); 337 RegionInfo regionInfo = 338 UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); 339 WAL wal = hrs.getWAL(regionInfo); 340 wal.rollWriter(true); 341 } 342 } 343 344 protected WALKeyImpl getWalKeyImpl() { 345 return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0, mvcc, scopes); 346 } 347 348 // Roll the WAL and wait for it to get deque from the log queue 349 private void rollWalsAndWaitForDeque(int numRs) throws IOException { 350 RegionInfo regionInfo = 351 UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); 352 for (int i = 0; i < numRs; i++) { 353 WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 354 wal.rollWriter(); 355 } 356 waitForLogAdvance(numRs); 357 } 358 359 private void appendEntriesToWal(int numEntries, WAL wal) throws IOException { 360 long txId = -1; 361 for (int i = 0; i < numEntries; i++) { 362 byte[] b = Bytes.toBytes(Integer.toString(i)); 363 KeyValue kv = new KeyValue(b, famName, b); 364 WALEdit edit = new WALEdit(); 365 edit.add(kv); 366 txId = wal.appendData(info, getWalKeyImpl(), edit); 367 } 368 wal.sync(txId); 369 } 370}