001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNull; 022import static org.junit.Assert.assertTrue; 023 024import java.util.concurrent.CopyOnWriteArrayList; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicInteger; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.Waiter; 031import org.apache.hadoop.hbase.testclassification.ClientTests; 032import org.apache.hadoop.hbase.testclassification.SmallTests; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.junit.After; 035import org.junit.Before; 036import org.junit.ClassRule; 037import org.junit.Test; 038import org.junit.experimental.categories.Category; 039 040import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; 041 042@Category({ ClientTests.class, SmallTests.class }) 043public class TestRegistryEndpointsRefresher { 044 045 @ClassRule 046 public static final HBaseClassTestRule CLASS_RULE = 047 HBaseClassTestRule.forClass(TestRegistryEndpointsRefresher.class); 048 049 private static final String INITIAL_DELAY_SECS_CONFIG_NAME = 050 "hbase.test.registry.initial.delay.secs"; 051 private static final String INTERVAL_SECS_CONFIG_NAME = 052 "hbase.test.registry.refresh.interval.secs"; 053 private static final String MIN_INTERVAL_SECS_CONFIG_NAME = 054 "hbase.test.registry.refresh.min.interval.secs"; 055 056 private Configuration conf; 057 private RegistryEndpointsRefresher refresher; 058 private AtomicInteger refreshCallCounter; 059 private CopyOnWriteArrayList<Long> callTimestamps; 060 061 @Before 062 public void setUp() { 063 conf = HBaseConfiguration.create(); 064 refreshCallCounter = new AtomicInteger(0); 065 callTimestamps = new CopyOnWriteArrayList<>(); 066 } 067 068 @After 069 public void tearDown() { 070 if (refresher != null) { 071 refresher.stop(); 072 } 073 } 074 075 private void refresh() { 076 refreshCallCounter.incrementAndGet(); 077 callTimestamps.add(EnvironmentEdgeManager.currentTime()); 078 } 079 080 private void createRefresher(long initialDelaySecs, long intervalSecs, long minIntervalSecs) { 081 conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, initialDelaySecs); 082 conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs); 083 conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs); 084 refresher = RegistryEndpointsRefresher.create(conf, INITIAL_DELAY_SECS_CONFIG_NAME, 085 INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh); 086 } 087 088 @Test 089 public void testDisableRefresh() { 090 conf.setLong(INTERVAL_SECS_CONFIG_NAME, -1); 091 assertNull(RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME, 092 INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh)); 093 } 094 095 @Test 096 public void testInitialDelay() throws InterruptedException { 097 createRefresher(1, 10, 0); 098 // Wait for 2 seconds to see that at least 1 refresh have been made since the initial delay is 1 099 // seconds 100 Waiter.waitFor(conf, 2000, () -> refreshCallCounter.get() == 1); 101 // Sleep more 5 seconds to make sure we have not made new calls since the interval is 10 seconds 102 Thread.sleep(5000); 103 assertEquals(1, refreshCallCounter.get()); 104 } 105 106 @Test 107 public void testPeriodicMasterEndPointRefresh() { 108 // Refresh every 1 second. 109 createRefresher(1, 1, 0); 110 // Wait for > 3 seconds to see that at least 3 refresh have been made. 111 Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3); 112 } 113 114 @Test 115 public void testDurationBetweenRefreshes() { 116 // Disable periodic refresh 117 // A minimum duration of 1s between refreshes 118 createRefresher(Integer.MAX_VALUE, Integer.MAX_VALUE, 1); 119 // Issue a ton of manual refreshes. 120 for (int i = 0; i < 10000; i++) { 121 refresher.refreshNow(); 122 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); 123 } 124 // Overall wait time is 10000 ms, so the number of requests should be <=10 125 // Actual calls to refresh should be much lower than the refresh count. 126 assertTrue(String.valueOf(refreshCallCounter.get()), refreshCallCounter.get() <= 20); 127 assertTrue(callTimestamps.size() > 0); 128 // Verify that the delta between subsequent refresh is at least 1sec as configured. 129 for (int i = 1; i < callTimestamps.size() - 1; i++) { 130 long delta = callTimestamps.get(i) - callTimestamps.get(i - 1); 131 // Few ms cushion to account for any env jitter. 132 assertTrue(callTimestamps.toString(), delta > 990); 133 } 134 } 135}