001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion; 021import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion; 022import static org.mockito.Mockito.mock; 023import static org.mockito.Mockito.when; 024 025import java.io.IOException; 026import java.util.Optional; 027import java.util.Queue; 028import java.util.concurrent.ConcurrentLinkedQueue; 029import java.util.concurrent.atomic.AtomicLong; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.HRegionInfo; 036import org.apache.hadoop.hbase.HTableDescriptor; 037import org.apache.hadoop.hbase.RegionLocations; 038import org.apache.hadoop.hbase.StartMiniClusterOption; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.ClusterConnection; 041import org.apache.hadoop.hbase.client.ConnectionFactory; 042import org.apache.hadoop.hbase.client.RegionInfo; 043import org.apache.hadoop.hbase.client.RegionLocator; 044import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 047import org.apache.hadoop.hbase.coprocessor.ObserverContext; 048import org.apache.hadoop.hbase.coprocessor.WALCoprocessor; 049import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment; 050import org.apache.hadoop.hbase.coprocessor.WALObserver; 051import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 052import org.apache.hadoop.hbase.regionserver.HRegionServer; 053import org.apache.hadoop.hbase.regionserver.Region; 054import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster; 055import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 056import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext; 057import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable; 058import org.apache.hadoop.hbase.testclassification.MediumTests; 059import org.apache.hadoop.hbase.testclassification.ReplicationTests; 060import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 061import org.apache.hadoop.hbase.wal.WAL.Entry; 062import org.apache.hadoop.hbase.wal.WALEdit; 063import org.apache.hadoop.hbase.wal.WALKey; 064import org.apache.hadoop.hbase.wal.WALKeyImpl; 065import org.junit.After; 066import org.junit.AfterClass; 067import org.junit.Assert; 068import org.junit.Before; 069import org.junit.BeforeClass; 070import org.junit.ClassRule; 071import org.junit.Test; 072import org.junit.experimental.categories.Category; 073 074import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 075 076import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 077 078/** 079 * Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this class 080 * contains lower level tests using callables. 081 */ 082@Category({ ReplicationTests.class, MediumTests.class }) 083public class TestRegionReplicaReplicationEndpointNoMaster { 084 085 @ClassRule 086 public static final HBaseClassTestRule CLASS_RULE = 087 HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpointNoMaster.class); 088 089 private static final int NB_SERVERS = 2; 090 private static TableName tableName = 091 TableName.valueOf(TestRegionReplicaReplicationEndpointNoMaster.class.getSimpleName()); 092 private static Table table; 093 private static final byte[] row = "TestRegionReplicaReplicator".getBytes(); 094 095 private static HRegionServer rs0; 096 private static HRegionServer rs1; 097 098 private static HRegionInfo hriPrimary; 099 private static HRegionInfo hriSecondary; 100 101 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 102 private static final byte[] f = HConstants.CATALOG_FAMILY; 103 104 @BeforeClass 105 public static void beforeClass() throws Exception { 106 Configuration conf = HTU.getConfiguration(); 107 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 108 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false); 109 110 // install WALObserver coprocessor for tests 111 String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY); 112 if (walCoprocs == null) { 113 walCoprocs = WALEditCopro.class.getName(); 114 } else { 115 walCoprocs += "," + WALEditCopro.class.getName(); 116 } 117 HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, walCoprocs); 118 StartMiniClusterOption option = StartMiniClusterOption.builder().numAlwaysStandByMasters(1) 119 .numRegionServers(NB_SERVERS).numDataNodes(NB_SERVERS).build(); 120 HTU.startMiniCluster(option); 121 122 // Create table then get the single region for our new table. 123 HTableDescriptor htd = HTU.createTableDescriptor(tableName.getNameAsString()); 124 table = HTU.createTable(htd, new byte[][] { f }, null); 125 126 try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) { 127 hriPrimary = locator.getRegionLocation(row, false).getRegionInfo(); 128 } 129 130 // mock a secondary region info to open 131 hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(), 132 hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1); 133 134 // No master 135 TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU); 136 rs0 = HTU.getMiniHBaseCluster().getRegionServer(0); 137 rs1 = HTU.getMiniHBaseCluster().getRegionServer(1); 138 } 139 140 @AfterClass 141 public static void afterClass() throws Exception { 142 HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false; 143 table.close(); 144 HTU.shutdownMiniCluster(); 145 } 146 147 @Before 148 public void before() throws Exception { 149 entries.clear(); 150 } 151 152 @After 153 public void after() throws Exception { 154 } 155 156 static ConcurrentLinkedQueue<Entry> entries = new ConcurrentLinkedQueue<>(); 157 158 public static class WALEditCopro implements WALCoprocessor, WALObserver { 159 public WALEditCopro() { 160 entries.clear(); 161 } 162 163 @Override 164 public Optional<WALObserver> getWALObserver() { 165 return Optional.of(this); 166 } 167 168 @Override 169 public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx, 170 RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { 171 // only keep primary region's edits 172 if (logKey.getTableName().equals(tableName) && info.getReplicaId() == 0) { 173 // Presume type is a WALKeyImpl 174 entries.add(new Entry((WALKeyImpl) logKey, logEdit)); 175 } 176 } 177 } 178 179 @Test 180 public void testReplayCallable() throws Exception { 181 // tests replaying the edits to a secondary region replica using the Callable directly 182 openRegion(HTU, rs0, hriSecondary); 183 ClusterConnection connection = 184 (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration()); 185 186 // load some data to primary 187 HTU.loadNumericRows(table, f, 0, 1000); 188 189 Assert.assertEquals(1000, entries.size()); 190 // replay the edits to the secondary using replay callable 191 replicateUsingCallable(connection, entries); 192 193 Region region = rs0.getRegion(hriSecondary.getEncodedName()); 194 HTU.verifyNumericRows(region, f, 0, 1000); 195 196 HTU.deleteNumericRows(table, f, 0, 1000); 197 closeRegion(HTU, rs0, hriSecondary); 198 connection.close(); 199 } 200 201 private void replicateUsingCallable(ClusterConnection connection, Queue<Entry> entries) 202 throws IOException, RuntimeException { 203 Entry entry; 204 while ((entry = entries.poll()) != null) { 205 byte[] row = CellUtil.cloneRow(entry.getEdit().getCells().get(0)); 206 RegionLocations locations = connection.locateRegion(tableName, row, true, true); 207 RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection, 208 RpcControllerFactory.instantiate(connection.getConfiguration()), table.getName(), 209 locations.getRegionLocation(1), locations.getRegionLocation(1).getRegionInfo(), row, 210 Lists.newArrayList(entry), new AtomicLong()); 211 212 RpcRetryingCallerFactory factory = 213 RpcRetryingCallerFactory.instantiate(connection.getConfiguration(), 214 connection.getConnectionConfiguration(), connection.getConnectionMetrics()); 215 factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000); 216 } 217 } 218 219 @Test 220 public void testReplayCallableWithRegionMove() throws Exception { 221 // tests replaying the edits to a secondary region replica using the Callable directly while 222 // the region is moved to another location.It tests handling of RME. 223 openRegion(HTU, rs0, hriSecondary); 224 ClusterConnection connection = 225 (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration()); 226 // load some data to primary 227 HTU.loadNumericRows(table, f, 0, 1000); 228 229 Assert.assertEquals(1000, entries.size()); 230 // replay the edits to the secondary using replay callable 231 replicateUsingCallable(connection, entries); 232 233 Region region = rs0.getRegion(hriSecondary.getEncodedName()); 234 HTU.verifyNumericRows(region, f, 0, 1000); 235 236 HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary 237 238 // move the secondary region from RS0 to RS1 239 closeRegion(HTU, rs0, hriSecondary); 240 openRegion(HTU, rs1, hriSecondary); 241 242 // replicate the new data 243 replicateUsingCallable(connection, entries); 244 245 region = rs1.getRegion(hriSecondary.getEncodedName()); 246 // verify the new data. old data may or may not be there 247 HTU.verifyNumericRows(region, f, 1000, 2000); 248 249 HTU.deleteNumericRows(table, f, 0, 2000); 250 closeRegion(HTU, rs1, hriSecondary); 251 connection.close(); 252 } 253 254 @Test 255 public void testRegionReplicaReplicationEndpointReplicate() throws Exception { 256 // tests replaying the edits to a secondary region replica using the RRRE.replicate() 257 openRegion(HTU, rs0, hriSecondary); 258 ClusterConnection connection = 259 (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration()); 260 RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint(); 261 262 ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class); 263 when(context.getConfiguration()).thenReturn(HTU.getConfiguration()); 264 when(context.getMetrics()).thenReturn(mock(MetricsSource.class)); 265 266 replicator.init(context); 267 replicator.startAsync(); 268 269 // load some data to primary 270 HTU.loadNumericRows(table, f, 0, 1000); 271 272 Assert.assertEquals(1000, entries.size()); 273 // replay the edits to the secondary using replay callable 274 final String fakeWalGroupId = "fakeWALGroup"; 275 replicator.replicate( 276 new ReplicateContext().setEntries(Lists.newArrayList(entries)).setWalGroupId(fakeWalGroupId)); 277 278 Region region = rs0.getRegion(hriSecondary.getEncodedName()); 279 HTU.verifyNumericRows(region, f, 0, 1000); 280 281 HTU.deleteNumericRows(table, f, 0, 1000); 282 closeRegion(HTU, rs0, hriSecondary); 283 connection.close(); 284 } 285}