001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.region; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.net.InetAddress; 026import java.security.cert.X509Certificate; 027import java.util.HashSet; 028import java.util.Map; 029import java.util.Optional; 030import java.util.Set; 031import org.apache.hadoop.hbase.ExtendedCellScanner; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.client.Get; 034import org.apache.hadoop.hbase.ipc.RpcCall; 035import org.apache.hadoop.hbase.ipc.RpcCallback; 036import org.apache.hadoop.hbase.ipc.RpcServer; 037import org.apache.hadoop.hbase.procedure2.Procedure; 038import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 039import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; 040import org.apache.hadoop.hbase.security.User; 041import org.apache.hadoop.hbase.testclassification.MasterTests; 042import org.apache.hadoop.hbase.testclassification.SmallTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 052import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 053import org.apache.hbase.thirdparty.com.google.protobuf.Message; 054 055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 057 058@Category({ MasterTests.class, SmallTests.class }) 059public class TestRegionProcedureStore extends RegionProcedureStoreTestBase { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestRegionProcedureStore.class); 064 065 private static final Logger LOG = LoggerFactory.getLogger(TestRegionProcedureStore.class); 066 067 private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception { 068 LOG.debug("expected: " + procIds); 069 LoadCounter loader = new LoadCounter(); 070 ProcedureTestingUtility.storeRestart(store, true, loader); 071 assertEquals(procIds.size(), loader.getLoadedCount()); 072 assertEquals(0, loader.getCorruptedCount()); 073 } 074 075 @Test 076 public void testLoad() throws Exception { 077 Set<Long> procIds = new HashSet<>(); 078 079 // Insert something in the log 080 RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure(); 081 procIds.add(proc1.getProcId()); 082 store.insert(proc1, null); 083 084 RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure(); 085 RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure(); 086 proc3.setParent(proc2); 087 RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure(); 088 proc4.setParent(proc2); 089 090 procIds.add(proc2.getProcId()); 091 procIds.add(proc3.getProcId()); 092 procIds.add(proc4.getProcId()); 093 store.insert(proc2, new Procedure[] { proc3, proc4 }); 094 095 // Verify that everything is there 096 verifyProcIdsOnRestart(procIds); 097 098 // Update and delete something 099 proc1.finish(); 100 store.update(proc1); 101 proc4.finish(); 102 store.update(proc4); 103 store.delete(proc4.getProcId()); 104 procIds.remove(proc4.getProcId()); 105 106 // Verify that everything is there 107 verifyProcIdsOnRestart(procIds); 108 } 109 110 @Test 111 public void testCleanup() throws Exception { 112 RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure(); 113 store.insert(proc1, null); 114 RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure(); 115 store.insert(proc2, null); 116 RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure(); 117 store.insert(proc3, null); 118 LoadCounter loader = new LoadCounter(); 119 store.load(loader); 120 assertEquals(proc3.getProcId(), loader.getMaxProcId()); 121 assertEquals(3, loader.getRunnableCount()); 122 123 store.delete(proc3.getProcId()); 124 store.delete(proc2.getProcId()); 125 loader = new LoadCounter(); 126 store.load(loader); 127 assertEquals(proc3.getProcId(), loader.getMaxProcId()); 128 assertEquals(1, loader.getRunnableCount()); 129 130 // the row should still be there 131 assertTrue(store.region 132 .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists()); 133 assertTrue(store.region 134 .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists()); 135 136 // proc2 will be deleted after cleanup, but proc3 should still be there as it holds the max proc 137 // id 138 store.cleanup(); 139 assertTrue(store.region 140 .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists()); 141 assertFalse(store.region 142 .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists()); 143 144 RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure(); 145 store.insert(proc4, null); 146 store.cleanup(); 147 // proc3 should also be deleted as now proc4 holds the max proc id 148 assertFalse(store.region 149 .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists()); 150 } 151 152 /** 153 * Test for HBASE-23895 154 */ 155 @Test 156 public void testInsertWithRpcCall() throws Exception { 157 RpcServer.setCurrentCall(newRpcCallWithDeadline()); 158 RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure(); 159 store.insert(proc1, null); 160 RpcServer.setCurrentCall(null); 161 } 162 163 @SuppressWarnings("checkstyle:methodlength") 164 private RpcCall newRpcCallWithDeadline() { 165 return new RpcCall() { 166 @Override 167 public long getDeadline() { 168 return EnvironmentEdgeManager.currentTime(); 169 } 170 171 @Override 172 public BlockingService getService() { 173 return null; 174 } 175 176 @Override 177 public Descriptors.MethodDescriptor getMethod() { 178 return null; 179 } 180 181 @Override 182 public Message getParam() { 183 return null; 184 } 185 186 @Override 187 public ExtendedCellScanner getCellScanner() { 188 return null; 189 } 190 191 @Override 192 public long getReceiveTime() { 193 return 0; 194 } 195 196 @Override 197 public long getStartTime() { 198 return 0; 199 } 200 201 @Override 202 public void setStartTime(long startTime) { 203 204 } 205 206 @Override 207 public int getTimeout() { 208 return 0; 209 } 210 211 @Override 212 public int getPriority() { 213 return 0; 214 } 215 216 @Override 217 public long getSize() { 218 return 0; 219 } 220 221 @Override 222 public RPCProtos.RequestHeader getHeader() { 223 return null; 224 } 225 226 @Override 227 public Map<String, byte[]> getConnectionAttributes() { 228 return null; 229 } 230 231 @Override 232 public Map<String, byte[]> getRequestAttributes() { 233 return null; 234 } 235 236 @Override 237 public byte[] getRequestAttribute(String key) { 238 return null; 239 } 240 241 @Override 242 public int getRemotePort() { 243 return 0; 244 } 245 246 @Override 247 public void setResponse(Message param, ExtendedCellScanner cells, Throwable errorThrowable, 248 String error) { 249 } 250 251 @Override 252 public void sendResponseIfReady() throws IOException { 253 } 254 255 @Override 256 public void cleanup() { 257 } 258 259 @Override 260 public String toShortString() { 261 return null; 262 } 263 264 @Override 265 public long disconnectSince() { 266 return 0; 267 } 268 269 @Override 270 public boolean isClientCellBlockSupported() { 271 return false; 272 } 273 274 @Override 275 public Optional<User> getRequestUser() { 276 return Optional.empty(); 277 } 278 279 @Override 280 public Optional<X509Certificate[]> getClientCertificateChain() { 281 return Optional.empty(); 282 } 283 284 @Override 285 public InetAddress getRemoteAddress() { 286 return null; 287 } 288 289 @Override 290 public HBaseProtos.VersionInfo getClientVersionInfo() { 291 return null; 292 } 293 294 @Override 295 public void setCallBack(RpcCallback callback) { 296 } 297 298 @Override 299 public boolean isRetryImmediatelySupported() { 300 return false; 301 } 302 303 @Override 304 public long getResponseCellSize() { 305 return 0; 306 } 307 308 @Override 309 public void incrementResponseCellSize(long cellSize) { 310 } 311 312 @Override 313 public long getBlockBytesScanned() { 314 return 0; 315 } 316 317 @Override 318 public void incrementBlockBytesScanned(long blockSize) { 319 } 320 321 @Override 322 public long getResponseExceptionSize() { 323 return 0; 324 } 325 326 @Override 327 public void incrementResponseExceptionSize(long exceptionSize) { 328 } 329 330 @Override 331 public void updateFsReadTime(long latencyMillis) { 332 333 } 334 335 @Override 336 public long getFsReadTime() { 337 return 0; 338 } 339 }; 340 } 341}