001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.containsString; 022import static org.hamcrest.Matchers.either; 023import static org.hamcrest.Matchers.instanceOf; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertThrows; 027import static org.junit.Assert.assertTrue; 028 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.TimeUnit; 031import org.apache.hadoop.hbase.DoNotRetryIOException; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.client.AsyncConnection; 034import org.apache.hadoop.hbase.client.AsyncTable; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.Get; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.RegionInfoBuilder; 039import org.apache.hadoop.hbase.client.RetriesExhaustedException; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.regionserver.HRegion; 042import org.apache.hadoop.hbase.regionserver.HRegionServer; 043import org.apache.hadoop.hbase.testclassification.LargeTests; 044import org.apache.hadoop.hbase.testclassification.ReplicationTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.wal.WALFactory; 047import org.apache.hadoop.hbase.wal.WALProvider; 048import org.junit.BeforeClass; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055@Category({ ReplicationTests.class, LargeTests.class }) 056public class TestSyncReplicationMoreLogsInLocalGiveUpSplitting extends SyncReplicationTestBase { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestSyncReplicationMoreLogsInLocalGiveUpSplitting.class); 061 062 private static final Logger LOG = 063 LoggerFactory.getLogger(TestSyncReplicationMoreLogsInLocalGiveUpSplitting.class); 064 065 @BeforeClass 066 public static void setUp() throws Exception { 067 UTIL1.getConfiguration().setClass(WALFactory.WAL_PROVIDER, BrokenRemoteAsyncFSWALProvider.class, 068 WALProvider.class); 069 UTIL2.getConfiguration().setClass(WALFactory.WAL_PROVIDER, BrokenRemoteAsyncFSWALProvider.class, 070 WALProvider.class); 071 SyncReplicationTestBase.setUp(); 072 } 073 074 @Test 075 public void testSplitLog() throws Exception { 076 UTIL1.getAdmin().disableReplicationPeer(PEER_ID); 077 UTIL2.getAdmin().disableReplicationPeer(PEER_ID); 078 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 079 SyncReplicationState.STANDBY); 080 UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 081 SyncReplicationState.ACTIVE); 082 try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) { 083 table.put(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, Bytes.toBytes(0))); 084 } 085 HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME); 086 BrokenRemoteAsyncFSWALProvider.BrokenRemoteAsyncFSWAL wal = 087 (BrokenRemoteAsyncFSWALProvider.BrokenRemoteAsyncFSWAL) rs.getWalFactory() 088 .getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build()); 089 wal.setRemoteBroken(); 090 wal.suspendLogRoll(); 091 try (AsyncConnection conn = 092 ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) { 093 AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1) 094 .setWriteRpcTimeout(5, TimeUnit.SECONDS).build(); 095 ExecutionException error = assertThrows(ExecutionException.class, 096 () -> table.put(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, Bytes.toBytes(1))).get()); 097 LOG.info("Expected error:", error); 098 } 099 wal.waitUntilArrive(); 100 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 101 SyncReplicationState.DOWNGRADE_ACTIVE); 102 wal.resumeLogRoll(); 103 try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) { 104 assertEquals(0, Bytes.toInt(table.get(new Get(Bytes.toBytes(0))).getValue(CF, CQ))); 105 // we failed to write this entry to remote so it should not exist 106 assertFalse(table.exists(new Get(Bytes.toBytes(1)))); 107 } 108 UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 109 SyncReplicationState.STANDBY); 110 // make sure that the region is online. We can not use waitTableAvailable since the table in 111 // stand by state can not be read from client. 112 try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) { 113 Exception error = 114 assertThrows(Exception.class, () -> table.exists(new Get(Bytes.toBytes(0)))); 115 assertThat(error, either(instanceOf(DoNotRetryIOException.class)) 116 .or(instanceOf(RetriesExhaustedException.class))); 117 assertThat(error.getMessage(), containsString("STANDBY")); 118 } 119 HRegion region = UTIL1.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 120 // we give up splitting the whole wal file so this record will also be gone. 121 assertTrue(region.get(new Get(Bytes.toBytes(0))).isEmpty()); 122 UTIL2.getAdmin().enableReplicationPeer(PEER_ID); 123 // finally it should be replicated back 124 waitUntilReplicationDone(UTIL1, 1); 125 } 126}