001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.normalizer; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.contains; 022import static org.hamcrest.Matchers.greaterThan; 023import static org.hamcrest.Matchers.lessThanOrEqualTo; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.fail; 026 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collection; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.LinkedList; 033import java.util.List; 034import java.util.Random; 035import java.util.concurrent.CompletableFuture; 036import java.util.concurrent.ConcurrentLinkedQueue; 037import java.util.concurrent.ThreadLocalRandom; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicBoolean; 040import java.util.stream.Collectors; 041import java.util.stream.IntStream; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.Waiter; 045import org.apache.hadoop.hbase.testclassification.MasterTests; 046import org.apache.hadoop.hbase.testclassification.SmallTests; 047import org.junit.ClassRule; 048import org.junit.Rule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.junit.rules.TestName; 052 053/** 054 * Tests that {@link RegionNormalizerWorkQueue} implements the contract described in its docstring. 055 */ 056@Category({ MasterTests.class, SmallTests.class }) 057public class TestRegionNormalizerWorkQueue { 058 059 @ClassRule 060 public static final HBaseClassTestRule CLASS_RULE = 061 HBaseClassTestRule.forClass(TestRegionNormalizerWorkQueue.class); 062 063 @Rule 064 public TestName testName = new TestName(); 065 066 @Test 067 public void testElementUniquenessAndFIFO() throws Exception { 068 final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>(); 069 final List<Integer> content = new LinkedList<>(); 070 IntStream.of(4, 3, 2, 1, 4, 3, 2, 1).boxed().forEach(queue::put); 071 assertEquals(4, queue.size()); 072 while (queue.size() > 0) { 073 content.add(queue.take()); 074 } 075 assertThat(content, contains(4, 3, 2, 1)); 076 077 queue.clear(); 078 queue.putAll(Arrays.asList(4, 3, 2, 1)); 079 queue.putAll(Arrays.asList(4, 5)); 080 assertEquals(5, queue.size()); 081 content.clear(); 082 while (queue.size() > 0) { 083 content.add(queue.take()); 084 } 085 assertThat(content, contains(4, 3, 2, 1, 5)); 086 } 087 088 @Test 089 public void testPriorityAndFIFO() throws Exception { 090 final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>(); 091 final List<Integer> content = new LinkedList<>(); 092 queue.putAll(Arrays.asList(4, 3, 2, 1)); 093 assertEquals(4, queue.size()); 094 queue.putFirst(0); 095 assertEquals(5, queue.size()); 096 drainTo(queue, content); 097 assertThat("putFirst items should jump the queue, preserving existing order", content, 098 contains(0, 4, 3, 2, 1)); 099 100 queue.clear(); 101 content.clear(); 102 queue.putAll(Arrays.asList(4, 3, 2, 1)); 103 queue.putFirst(1); 104 assertEquals(4, queue.size()); 105 drainTo(queue, content); 106 assertThat("existing items re-added with putFirst should jump the queue", content, 107 contains(1, 4, 3, 2)); 108 109 queue.clear(); 110 content.clear(); 111 queue.putAll(Arrays.asList(4, 3, 2, 1)); 112 queue.putAllFirst(Arrays.asList(2, 3)); 113 assertEquals(4, queue.size()); 114 drainTo(queue, content); 115 assertThat( 116 "existing items re-added with putAllFirst jump the queue AND honor changes in priority", 117 content, contains(2, 3, 4, 1)); 118 } 119 120 private enum Action { 121 PUT, 122 PUT_FIRST, 123 PUT_ALL, 124 PUT_ALL_FIRST, 125 } 126 127 /** 128 * Test that the uniqueness constraint is honored in the face of concurrent modification. 129 */ 130 @Test 131 public void testConcurrentPut() throws Exception { 132 final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>(); 133 final int maxValue = 100; 134 final Runnable producer = () -> { 135 final Random rand = ThreadLocalRandom.current(); 136 for (int i = 0; i < 1_000; i++) { 137 final Action action = Action.values()[rand.nextInt(Action.values().length)]; 138 switch (action) { 139 case PUT: { 140 final int val = rand.nextInt(maxValue); 141 queue.put(val); 142 break; 143 } 144 case PUT_FIRST: { 145 final int val = rand.nextInt(maxValue); 146 queue.putFirst(val); 147 break; 148 } 149 case PUT_ALL: { 150 final List<Integer> vals = 151 rand.ints(5, 0, maxValue).boxed().collect(Collectors.toList()); 152 queue.putAll(vals); 153 break; 154 } 155 case PUT_ALL_FIRST: { 156 final List<Integer> vals = 157 rand.ints(5, 0, maxValue).boxed().collect(Collectors.toList()); 158 queue.putAllFirst(vals); 159 break; 160 } 161 default: 162 fail("Unrecognized action " + action); 163 } 164 } 165 }; 166 167 final int numThreads = 5; 168 final CompletableFuture<?>[] futures = IntStream.range(0, numThreads) 169 .mapToObj(val -> CompletableFuture.runAsync(producer)).toArray(CompletableFuture<?>[]::new); 170 CompletableFuture.allOf(futures).join(); 171 172 final List<Integer> content = new ArrayList<>(queue.size()); 173 drainTo(queue, content); 174 assertThat("at most `maxValue` items should be present.", content.size(), 175 lessThanOrEqualTo(maxValue)); 176 assertEquals("all items should be unique.", content.size(), new HashSet<>(content).size()); 177 } 178 179 /** 180 * Test that calls to {@link RegionNormalizerWorkQueue#take()} block the requesting thread. The 181 * producing thread places new entries onto the queue following a known schedule. The consuming 182 * thread collects a time measurement between calls to {@code take}. Finally, the test makes 183 * coarse-grained assertions of the consumer's observations based on the producer's schedule. 184 */ 185 @Test 186 public void testTake() throws Exception { 187 final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>(); 188 final ConcurrentLinkedQueue<Long> takeTimes = new ConcurrentLinkedQueue<>(); 189 final AtomicBoolean finished = new AtomicBoolean(false); 190 final int count = 5; 191 final Runnable consumer = () -> { 192 try { 193 while (!finished.get()) { 194 queue.take(); 195 takeTimes.add(System.nanoTime()); 196 } 197 } catch (InterruptedException e) { 198 fail("interrupted."); 199 } 200 }; 201 202 CompletableFuture<Void> worker = CompletableFuture.runAsync(consumer); 203 final long testStart = System.nanoTime(); 204 for (int i = 0; i < count; i++) { 205 Thread.sleep(10); 206 queue.put(i); 207 } 208 // should have timing information for 5 calls to take. 209 Waiter.waitFor(HBaseConfiguration.create(), 1000, () -> count == takeTimes.size()); 210 // set finished = true and pipe one more value in case the thread needs an extra pass through 211 // the loop. 212 finished.set(true); 213 queue.put(1); 214 worker.get(1, TimeUnit.SECONDS); 215 216 final Iterator<Long> times = takeTimes.iterator(); 217 for (int i = 0; i < count; i++) { 218 assertThat( 219 "Observations collected in takeTimes should increase by roughly 10ms every interval", 220 times.next(), greaterThan(testStart + TimeUnit.MILLISECONDS.toNanos(i * 10))); 221 } 222 } 223 224 private static <E> void drainTo(final RegionNormalizerWorkQueue<E> queue, Collection<E> dest) 225 throws InterruptedException { 226 assertThat(queue.size(), greaterThan(0)); 227 while (queue.size() > 0) { 228 dest.add(queue.take()); 229 } 230 } 231}