001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Optional; 024import java.util.concurrent.Future; 025import java.util.concurrent.Semaphore; 026import java.util.concurrent.TimeUnit; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 032import org.apache.hadoop.hbase.client.AsyncAdmin; 033import org.apache.hadoop.hbase.client.BalanceRequest; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.Durability; 036import org.apache.hadoop.hbase.client.Get; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 040import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 041import org.apache.hadoop.hbase.coprocessor.ObserverContext; 042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 044import org.apache.hadoop.hbase.coprocessor.RegionObserver; 045import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; 046import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 047import org.apache.hadoop.hbase.regionserver.HRegionServer; 048import org.apache.hadoop.hbase.testclassification.LargeTests; 049import org.apache.hadoop.hbase.testclassification.MasterTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 052import org.apache.hadoop.hbase.wal.WALEdit; 053import org.junit.AfterClass; 054import org.junit.BeforeClass; 055import org.junit.ClassRule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058 059import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 060 061/** 062 * Test to ensure that the priority for procedures and stuck checker can partially solve the problem 063 * describe in HBASE-19976, that is, RecoverMetaProcedure can finally be executed within a certain 064 * period of time. 065 * <p> 066 * As of HBASE-28199, we no longer block a worker when updating meta now, so this test can not test 067 * adding procedure worker now, but it could still be used to make sure that we could make progress 068 * when meta is gone and we have a lot of pending TRSPs. 069 */ 070@Category({ MasterTests.class, LargeTests.class }) 071public class TestProcedurePriority { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestProcedurePriority.class); 076 077 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 078 079 private static String TABLE_NAME_PREFIX = "TestProcedurePriority-"; 080 081 private static byte[] CF = Bytes.toBytes("cf"); 082 083 private static byte[] CQ = Bytes.toBytes("cq"); 084 085 private static int CORE_POOL_SIZE; 086 087 private static int TABLE_COUNT; 088 089 private static volatile boolean FAIL = false; 090 091 public static final class MyCP implements RegionObserver, RegionCoprocessor { 092 093 @Override 094 public Optional<RegionObserver> getRegionObserver() { 095 return Optional.of(this); 096 } 097 098 @Override 099 public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get, 100 List<Cell> result) throws IOException { 101 if (FAIL && c.getEnvironment().getRegionInfo().isMetaRegion()) { 102 throw new IOException("Inject error"); 103 } 104 } 105 106 @Override 107 public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> c, Put put, 108 WALEdit edit, Durability durability) throws IOException { 109 if (FAIL && c.getEnvironment().getRegionInfo().isMetaRegion()) { 110 throw new IOException("Inject error"); 111 } 112 } 113 } 114 115 @BeforeClass 116 public static void setUp() throws Exception { 117 UTIL.getConfiguration().setLong(ProcedureExecutor.WORKER_KEEP_ALIVE_TIME_CONF_KEY, 5000); 118 UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 4); 119 UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCP.class.getName()); 120 UTIL.startMiniCluster(3); 121 CORE_POOL_SIZE = 122 UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getCorePoolSize(); 123 TABLE_COUNT = 50 * CORE_POOL_SIZE; 124 List<Future<?>> futures = new ArrayList<>(); 125 AsyncAdmin admin = UTIL.getAsyncConnection().getAdmin(); 126 Semaphore concurrency = new Semaphore(10); 127 for (int i = 0; i < TABLE_COUNT; i++) { 128 concurrency.acquire(); 129 futures.add(admin 130 .createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME_PREFIX + i)) 131 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build()) 132 .whenComplete((r, e) -> concurrency.release())); 133 } 134 for (Future<?> future : futures) { 135 future.get(3, TimeUnit.MINUTES); 136 } 137 UTIL.getAdmin().balance(BalanceRequest.newBuilder().setIgnoreRegionsInTransition(true).build()); 138 UTIL.waitUntilNoRegionsInTransition(); 139 UTIL.getAdmin().balancerSwitch(false, true); 140 } 141 142 @AfterClass 143 public static void tearDown() throws Exception { 144 UTIL.shutdownMiniCluster(); 145 } 146 147 @Test 148 public void test() throws Exception { 149 RegionServerThread rsWithMetaThread = UTIL.getMiniHBaseCluster().getRegionServerThreads() 150 .stream().filter(t -> !t.getRegionServer().getRegions(TableName.META_TABLE_NAME).isEmpty()) 151 .findAny().get(); 152 HRegionServer rsNoMeta = UTIL.getOtherRegionServer(rsWithMetaThread.getRegionServer()); 153 FAIL = true; 154 UTIL.getMiniHBaseCluster().killRegionServer(rsNoMeta.getServerName()); 155 ProcedureExecutor<?> executor = 156 UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); 157 // wait until we have way more TRSPs than the core pool size, and then make sure we can recover 158 // normally 159 UTIL.waitFor(60000, new ExplainingPredicate<Exception>() { 160 161 @Override 162 public boolean evaluate() throws Exception { 163 return executor.getProcedures().stream().filter(p -> !p.isFinished()) 164 .filter(p -> p.getState() != ProcedureState.INITIALIZING) 165 .filter(p -> p instanceof TransitRegionStateProcedure).count() > 5 * CORE_POOL_SIZE; 166 } 167 168 @Override 169 public String explainFailure() throws Exception { 170 return "Not enough TRSPs scheduled"; 171 } 172 }); 173 // sleep more time to make sure the TRSPs have been executed 174 Thread.sleep(10000); 175 UTIL.getMiniHBaseCluster().killRegionServer(rsWithMetaThread.getRegionServer().getServerName()); 176 rsWithMetaThread.join(); 177 FAIL = false; 178 // verify that the cluster is back 179 UTIL.waitUntilNoRegionsInTransition(480000); 180 for (int i = 0; i < TABLE_COUNT; i++) { 181 try (Table table = UTIL.getConnection().getTable(TableName.valueOf(TABLE_NAME_PREFIX + i))) { 182 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 183 } 184 } 185 UTIL.waitFor(60000, new ExplainingPredicate<Exception>() { 186 187 @Override 188 public boolean evaluate() throws Exception { 189 return executor.getWorkerThreadCount() == CORE_POOL_SIZE; 190 } 191 192 @Override 193 public String explainFailure() throws Exception { 194 return "The new workers do not timeout"; 195 } 196 }); 197 } 198}