001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.NavigableMap; 024import java.util.TreeMap; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.KeyValue; 029import org.apache.hadoop.hbase.Waiter; 030import org.apache.hadoop.hbase.client.RegionInfo; 031import org.apache.hadoop.hbase.client.RegionInfoBuilder; 032import org.apache.hadoop.hbase.regionserver.HRegionServer; 033import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 035import org.apache.hadoop.hbase.replication.regionserver.Replication; 036import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; 037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.testclassification.ReplicationTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 043import org.apache.hadoop.hbase.wal.WAL; 044import org.apache.hadoop.hbase.wal.WALEdit; 045import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 046import org.apache.hadoop.hbase.wal.WALKeyImpl; 047import org.junit.Assert; 048import org.junit.Before; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052 053@Category({ ReplicationTests.class, LargeTests.class }) 054public class TestReplicationEmptyWALRecovery extends TestReplicationBase { 055 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 056 static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 057 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 058 059 @ClassRule 060 public static final HBaseClassTestRule CLASS_RULE = 061 HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class); 062 063 @Before 064 public void setUp() throws IOException, InterruptedException { 065 cleanUp(); 066 scopes.put(famName, HConstants.REPLICATION_SCOPE_GLOBAL); 067 replicateCount.set(0); 068 replicatedEntries.clear(); 069 } 070 071 /** 072 * Waits until there is only one log(the current writing one) in the replication queue 073 * @param numRs number of region servers 074 */ 075 private void waitForLogAdvance(int numRs) { 076 Waiter.waitFor(CONF1, 100000, new Waiter.Predicate<Exception>() { 077 @Override 078 public boolean evaluate() throws Exception { 079 for (int i = 0; i < numRs; i++) { 080 HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i); 081 RegionInfo regionInfo = 082 UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); 083 WAL wal = hrs.getWAL(regionInfo); 084 Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName(); 085 Replication replicationService = 086 (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); 087 for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() 088 .getSources()) { 089 ReplicationSource source = (ReplicationSource) rsi; 090 // We are making sure that there is only one log queue and that is for the 091 // current WAL of region server 092 String logPrefix = source.getQueues().keySet().stream().findFirst().get(); 093 if ( 094 !currentFile.equals(source.getCurrentPath()) 095 || source.getQueues().keySet().size() != 1 096 || source.getQueues().get(logPrefix).size() != 1 097 ) { 098 return false; 099 } 100 } 101 } 102 return true; 103 } 104 }); 105 } 106 107 private void verifyNumberOfLogsInQueue(int numQueues, int numRs) { 108 Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() { 109 @Override 110 public boolean evaluate() { 111 for (int i = 0; i < numRs; i++) { 112 Replication replicationService = 113 (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); 114 for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() 115 .getSources()) { 116 ReplicationSource source = (ReplicationSource) rsi; 117 String logPrefix = source.getQueues().keySet().stream().findFirst().get(); 118 if (source.getQueues().get(logPrefix).size() != numQueues) { 119 return false; 120 } 121 } 122 } 123 return true; 124 } 125 }); 126 } 127 128 @Test 129 public void testEmptyWALRecovery() throws Exception { 130 final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 131 // for each RS, create an empty wal with same walGroupId 132 final List<Path> emptyWalPaths = new ArrayList<>(); 133 long ts = EnvironmentEdgeManager.currentTime(); 134 for (int i = 0; i < numRs; i++) { 135 RegionInfo regionInfo = 136 UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); 137 WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 138 Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); 139 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); 140 Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); 141 UTIL1.getTestFileSystem().create(emptyWalPath).close(); 142 emptyWalPaths.add(emptyWalPath); 143 } 144 145 injectEmptyWAL(numRs, emptyWalPaths); 146 147 // ReplicationSource should advance past the empty wal, or else the test will fail 148 waitForLogAdvance(numRs); 149 verifyNumberOfLogsInQueue(1, numRs); 150 // we're now writing to the new wal 151 // if everything works, the source should've stopped reading from the empty wal, and start 152 // replicating from the new wal 153 runSimplePutDeleteTest(); 154 rollWalsAndWaitForDeque(numRs); 155 } 156 157 /** 158 * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we 159 * see the empty and handle the EOF exception, we are able to ship the previous batch of entries 160 * without loosing it. This test also tests the number of batches shipped 161 * @throws Exception throws any exception 162 */ 163 @Test 164 public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception { 165 final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 166 // make sure we only the current active wal file in queue 167 verifyNumberOfLogsInQueue(1, numRs); 168 169 // Disable the replication peer to accumulate the non empty WAL followed by empty WAL 170 hbaseAdmin.disableReplicationPeer(PEER_ID2); 171 172 int numOfEntriesToReplicate = 20; 173 // for each RS, create an empty wal with same walGroupId 174 final List<Path> emptyWalPaths = new ArrayList<>(); 175 long ts = EnvironmentEdgeManager.currentTime(); 176 for (int i = 0; i < numRs; i++) { 177 RegionInfo regionInfo = 178 UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); 179 WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 180 Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); 181 182 appendEntriesToWal(numOfEntriesToReplicate, wal); 183 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); 184 Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts); 185 UTIL1.getTestFileSystem().create(emptyWalPath).close(); 186 emptyWalPaths.add(emptyWalPath); 187 } 188 189 injectEmptyWAL(numRs, emptyWalPaths); 190 // There should be three WALs in queue 191 // 1. non empty WAL 192 // 2. empty WAL 193 // 3. live WAL 194 verifyNumberOfLogsInQueue(3, numRs); 195 hbaseAdmin.enableReplicationPeer(PEER_ID2); 196 // ReplicationSource should advance past the empty wal, or else the test will fail 197 waitForLogAdvance(numRs); 198 199 // Now we should expect numOfEntriesToReplicate entries 200 // replicated from each region server. This makes sure we didn't loose data 201 // from any previous batch when we encounter EOF exception for empty file. 202 Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs, 203 replicatedEntries.size()); 204 205 // We expect just one batch of replication which will 206 // be from when we handle the EOF exception. 207 Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.intValue()); 208 verifyNumberOfLogsInQueue(1, numRs); 209 // we're now writing to the new wal 210 // if everything works, the source should've stopped reading from the empty wal, and start 211 // replicating from the new wal 212 runSimplePutDeleteTest(); 213 rollWalsAndWaitForDeque(numRs); 214 } 215 216 /** 217 * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we 218 * see the empty WAL and handle the EOF exception, we are able to proceed with next batch and 219 * replicate it properly without missing data. 220 * @throws Exception throws any exception 221 */ 222 @Test 223 public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception { 224 // Disable the replication peer to accumulate the non empty WAL followed by empty WAL 225 hbaseAdmin.disableReplicationPeer(PEER_ID2); 226 int numOfEntriesToReplicate = 20; 227 228 final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 229 // for each RS, create an empty wal with same walGroupId 230 final List<Path> emptyWalPaths = new ArrayList<>(); 231 long ts = EnvironmentEdgeManager.currentTime(); 232 WAL wal = null; 233 for (int i = 0; i < numRs; i++) { 234 RegionInfo regionInfo = 235 UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); 236 wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 237 Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); 238 appendEntriesToWal(numOfEntriesToReplicate, wal); 239 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); 240 Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); 241 UTIL1.getTestFileSystem().create(emptyWalPath).close(); 242 emptyWalPaths.add(emptyWalPath); 243 244 } 245 injectEmptyWAL(numRs, emptyWalPaths); 246 // roll the WAL now 247 for (int i = 0; i < numRs; i++) { 248 wal.rollWriter(); 249 } 250 hbaseAdmin.enableReplicationPeer(PEER_ID2); 251 // ReplicationSource should advance past the empty wal, or else the test will fail 252 waitForLogAdvance(numRs); 253 254 // Now we should expect numOfEntriesToReplicate entries 255 // replicated from each region server. This makes sure we didn't loose data 256 // from any previous batch when we encounter EOF exception for empty file. 257 Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs, 258 replicatedEntries.size()); 259 260 // We expect just one batch of replication to be shipped which will 261 // for non empty WAL 262 Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.get()); 263 verifyNumberOfLogsInQueue(1, numRs); 264 // we're now writing to the new wal 265 // if everything works, the source should've stopped reading from the empty wal, and start 266 // replicating from the new wal 267 runSimplePutDeleteTest(); 268 rollWalsAndWaitForDeque(numRs); 269 } 270 271 /** 272 * This test make sure we replicate all the enties from the non empty WALs which are surrounding 273 * the empty WALs 274 * @throws Exception throws exception 275 */ 276 @Test 277 public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception { 278 // Disable the replication peer to accumulate the non empty WAL followed by empty WAL 279 hbaseAdmin.disableReplicationPeer(PEER_ID2); 280 int numOfEntriesToReplicate = 20; 281 282 final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 283 // for each RS, create an empty wal with same walGroupId 284 final List<Path> emptyWalPaths = new ArrayList<>(); 285 long ts = EnvironmentEdgeManager.currentTime(); 286 WAL wal = null; 287 for (int i = 0; i < numRs; i++) { 288 RegionInfo regionInfo = 289 UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); 290 wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 291 Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); 292 appendEntriesToWal(numOfEntriesToReplicate, wal); 293 wal.rollWriter(); 294 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); 295 Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); 296 UTIL1.getTestFileSystem().create(emptyWalPath).close(); 297 emptyWalPaths.add(emptyWalPath); 298 } 299 injectEmptyWAL(numRs, emptyWalPaths); 300 301 // roll the WAL again with some entries 302 for (int i = 0; i < numRs; i++) { 303 appendEntriesToWal(numOfEntriesToReplicate, wal); 304 wal.rollWriter(); 305 } 306 307 hbaseAdmin.enableReplicationPeer(PEER_ID2); 308 // ReplicationSource should advance past the empty wal, or else the test will fail 309 waitForLogAdvance(numRs); 310 311 // Now we should expect numOfEntriesToReplicate entries 312 // replicated from each region server. This makes sure we didn't loose data 313 // from any previous batch when we encounter EOF exception for empty file. 314 Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs * 2, 315 replicatedEntries.size()); 316 317 // We expect two batch of replication to be shipped which will 318 // for non empty WAL 319 Assert.assertEquals("Replicated batches are not correct", 2, replicateCount.get()); 320 verifyNumberOfLogsInQueue(1, numRs); 321 // we're now writing to the new wal 322 // if everything works, the source should've stopped reading from the empty wal, and start 323 // replicating from the new wal 324 runSimplePutDeleteTest(); 325 rollWalsAndWaitForDeque(numRs); 326 } 327 328 // inject our empty wal into the replication queue, and then roll the original wal, which 329 // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to 330 // determine if the file being replicated currently is still opened for write, so just inject a 331 // new wal to the replication queue does not mean the previous file is closed. 332 private void injectEmptyWAL(int numRs, List<Path> emptyWalPaths) throws IOException { 333 for (int i = 0; i < numRs; i++) { 334 HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i); 335 Replication replicationService = (Replication) hrs.getReplicationSourceService(); 336 replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i)); 337 RegionInfo regionInfo = 338 UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); 339 WAL wal = hrs.getWAL(regionInfo); 340 wal.rollWriter(true); 341 } 342 } 343 344 protected WALKeyImpl getWalKeyImpl() { 345 return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0, mvcc, scopes); 346 } 347 348 // Roll the WAL and wait for it to get deque from the log queue 349 private void rollWalsAndWaitForDeque(int numRs) throws IOException { 350 RegionInfo regionInfo = 351 UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); 352 for (int i = 0; i < numRs; i++) { 353 WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); 354 wal.rollWriter(); 355 } 356 waitForLogAdvance(numRs); 357 } 358 359 private void appendEntriesToWal(int numEntries, WAL wal) throws IOException { 360 long txId = -1; 361 for (int i = 0; i < numEntries; i++) { 362 byte[] b = Bytes.toBytes(Integer.toString(i)); 363 KeyValue kv = new KeyValue(b, famName, b); 364 WALEdit edit = new WALEdit(); 365 WALEditInternalHelper.addExtendedCell(edit, kv); 366 txId = wal.appendData(info, getWalKeyImpl(), edit); 367 } 368 wal.sync(txId); 369 } 370}