001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.Arrays; 024import java.util.List; 025import java.util.Optional; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.concurrent.ForkJoinPool; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.function.Supplier; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.coprocessor.ObserverContext; 035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 036import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 037import org.apache.hadoop.hbase.coprocessor.RegionObserver; 038import org.apache.hadoop.hbase.regionserver.HRegion; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 041import org.junit.AfterClass; 042import org.junit.Rule; 043import org.junit.Test; 044import org.junit.rules.TestName; 045import org.junit.runners.Parameterized.Parameter; 046import org.junit.runners.Parameterized.Parameters; 047 048import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 049 050public abstract class AbstractTestAsyncTableRegionReplicasRead { 051 052 protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 053 054 protected static TableName TABLE_NAME = TableName.valueOf("async"); 055 056 protected static byte[] FAMILY = Bytes.toBytes("cf"); 057 058 protected static byte[] QUALIFIER = Bytes.toBytes("cq"); 059 060 protected static byte[] ROW = Bytes.toBytes("row"); 061 062 protected static byte[] VALUE = Bytes.toBytes("value"); 063 064 protected static int REPLICA_COUNT = 3; 065 066 protected static AsyncConnection ASYNC_CONN; 067 068 @Rule 069 public TestName testName = new TestName(); 070 071 @Parameter 072 public Supplier<AsyncTable<?>> getTable; 073 074 private static AsyncTable<?> getRawTable() { 075 return ASYNC_CONN.getTable(TABLE_NAME); 076 } 077 078 private static AsyncTable<?> getTable() { 079 return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); 080 } 081 082 @Parameters 083 public static List<Object[]> params() { 084 return Arrays.asList( 085 new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getRawTable }, 086 new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getTable }); 087 } 088 089 protected static volatile boolean FAIL_PRIMARY_GET = false; 090 091 protected static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT = 092 new ConcurrentHashMap<>(); 093 094 public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor { 095 096 @Override 097 public Optional<RegionObserver> getRegionObserver() { 098 return Optional.of(this); 099 } 100 101 private void recordAndTryFail(ObserverContext<? extends RegionCoprocessorEnvironment> c) 102 throws IOException { 103 RegionInfo region = c.getEnvironment().getRegionInfo(); 104 if (!region.getTable().equals(TABLE_NAME)) { 105 return; 106 } 107 REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger()) 108 .incrementAndGet(); 109 if (region.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) { 110 throw new IOException("Inject error"); 111 } 112 } 113 114 @Override 115 public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get, 116 List<Cell> result) throws IOException { 117 recordAndTryFail(c); 118 } 119 120 @Override 121 public void preScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c, Scan scan) 122 throws IOException { 123 recordAndTryFail(c); 124 } 125 } 126 127 private static boolean allReplicasHaveRow(byte[] row) throws IOException { 128 for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) { 129 for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) { 130 if (region.get(new Get(row), false).isEmpty()) { 131 return false; 132 } 133 } 134 } 135 return true; 136 } 137 138 protected static void startClusterAndCreateTable() throws Exception { 139 TEST_UTIL.startMiniCluster(3); 140 TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) 141 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT) 142 .setCoprocessor(FailPrimaryGetCP.class.getName()).build()); 143 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME); 144 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 145 } 146 147 protected static void waitUntilAllReplicasHaveRow(byte[] row) throws IOException { 148 // this is the fastest way to let all replicas have the row 149 TEST_UTIL.getAdmin().disableTable(TABLE_NAME); 150 TEST_UTIL.getAdmin().enableTable(TABLE_NAME); 151 TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow(row)); 152 } 153 154 @AfterClass 155 public static void tearDownAfterClass() throws Exception { 156 Closeables.close(ASYNC_CONN, true); 157 TEST_UTIL.shutdownMiniCluster(); 158 } 159 160 protected static int getSecondaryGetCount() { 161 return REPLICA_ID_TO_COUNT.entrySet().stream() 162 .filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID) 163 .mapToInt(e -> e.getValue().get()).sum(); 164 } 165 166 protected static int getPrimaryGetCount() { 167 AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID); 168 return primaryGetCount != null ? primaryGetCount.get() : 0; 169 } 170 171 // replicaId = -1 means do not set replica 172 protected abstract void readAndCheck(AsyncTable<?> table, int replicaId) throws Exception; 173 174 @Test 175 public void testNoReplicaRead() throws Exception { 176 FAIL_PRIMARY_GET = false; 177 REPLICA_ID_TO_COUNT.clear(); 178 AsyncTable<?> table = getTable.get(); 179 readAndCheck(table, -1); 180 // the primary region is fine and the primary timeout is 1 second which is long enough, so we 181 // should not send any requests to secondary replicas even if the consistency is timeline. 182 Thread.sleep(5000); 183 assertEquals(0, getSecondaryGetCount()); 184 } 185 186 @Test 187 public void testReplicaRead() throws Exception { 188 // fail the primary get request 189 FAIL_PRIMARY_GET = true; 190 REPLICA_ID_TO_COUNT.clear(); 191 // make sure that we could still get the value from secondary replicas 192 AsyncTable<?> table = getTable.get(); 193 readAndCheck(table, -1); 194 // make sure that the primary request has been canceled 195 Thread.sleep(5000); 196 int count = getPrimaryGetCount(); 197 Thread.sleep(10000); 198 assertEquals(count, getPrimaryGetCount()); 199 } 200 201 @Test 202 public void testReadSpecificReplica() throws Exception { 203 FAIL_PRIMARY_GET = false; 204 REPLICA_ID_TO_COUNT.clear(); 205 AsyncTable<?> table = getTable.get(); 206 for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) { 207 readAndCheck(table, replicaId); 208 assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get()); 209 } 210 } 211}