001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertTrue; 021import static org.mockito.ArgumentMatchers.any; 022import static org.mockito.ArgumentMatchers.anyInt; 023import static org.mockito.ArgumentMatchers.anyLong; 024import static org.mockito.Mockito.doAnswer; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.util.Collections; 030import java.util.concurrent.CompletableFuture; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicBoolean; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HRegionLocation; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.ipc.HBaseRpcController; 041import org.apache.hadoop.hbase.security.User; 042import org.apache.hadoop.hbase.testclassification.RegionServerTests; 043import org.apache.hadoop.hbase.testclassification.SmallTests; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.junit.AfterClass; 046import org.junit.BeforeClass; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050 051import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 052import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 053 054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.Interface; 056 057/** 058 * Make sure we could fallback to use replay method if replicateToReplica method is not present, 059 * i.e, we are connecting an old region server. 060 */ 061@Category({ RegionServerTests.class, SmallTests.class }) 062public class TestFallbackToUseReplay { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestFallbackToUseReplay.class); 067 068 private static Configuration CONF = HBaseConfiguration.create(); 069 070 private static AsyncClusterConnectionImpl CONN; 071 072 private static AsyncRegionReplicationRetryingCaller CALLER; 073 074 private static RegionInfo REPLICA = 075 RegionInfoBuilder.newBuilder(TableName.valueOf("test")).setReplicaId(1).build(); 076 077 private static AtomicBoolean REPLAY_CALLED = new AtomicBoolean(false); 078 079 @BeforeClass 080 public static void setUpBeforeClass() throws IOException { 081 CONF.setInt(AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 082 AsyncRegionLocator locator = mock(AsyncRegionLocator.class); 083 when(locator.getRegionLocation(any(), any(), anyInt(), any(), anyLong())) 084 .thenReturn(CompletableFuture.completedFuture(new HRegionLocation(REPLICA, 085 ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime())))); 086 AdminService.Interface stub = mock(AdminService.Interface.class); 087 // fail the call to replicateToReplica 088 doAnswer(i -> { 089 HBaseRpcController controller = i.getArgument(0, HBaseRpcController.class); 090 controller.setFailed(new DoNotRetryIOException(new UnsupportedOperationException())); 091 RpcCallback<?> done = i.getArgument(2, RpcCallback.class); 092 done.run(null); 093 return null; 094 }).when(stub).replicateToReplica(any(), any(), any()); 095 doAnswer(i -> { 096 REPLAY_CALLED.set(true); 097 RpcCallback<?> done = i.getArgument(2, RpcCallback.class); 098 done.run(null); 099 return null; 100 }).when(stub).replay(any(), any(), any()); 101 CONN = new AsyncClusterConnectionImpl(CONF, mock(ConnectionRegistry.class), "test", null, 102 User.getCurrent()) { 103 104 @Override 105 AsyncRegionLocator getLocator() { 106 return locator; 107 } 108 109 @Override 110 Interface getAdminStub(ServerName serverName) throws IOException { 111 return stub; 112 } 113 }; 114 CALLER = new AsyncRegionReplicationRetryingCaller(AsyncClusterConnectionImpl.RETRY_TIMER, CONN, 115 10, TimeUnit.SECONDS.toNanos(1), TimeUnit.SECONDS.toNanos(10), REPLICA, 116 Collections.emptyList()); 117 } 118 119 @AfterClass 120 public static void tearDownAfterClass() throws IOException { 121 Closeables.close(CONN, true); 122 } 123 124 @Test 125 public void testFallback() { 126 CALLER.call().join(); 127 assertTrue(REPLAY_CALLED.get()); 128 } 129}