001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertSame; 024import static org.junit.Assert.assertTrue; 025 026import java.util.Comparator; 027import java.util.PriorityQueue; 028import java.util.concurrent.CyclicBarrier; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.TimeUnit; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.testclassification.MiscTests; 034import org.apache.hadoop.hbase.testclassification.SmallTests; 035import org.junit.After; 036import org.junit.Before; 037import org.junit.ClassRule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040 041@Category({ MiscTests.class, SmallTests.class }) 042public class TestBoundedPriorityBlockingQueue { 043 044 @ClassRule 045 public static final HBaseClassTestRule CLASS_RULE = 046 HBaseClassTestRule.forClass(TestBoundedPriorityBlockingQueue.class); 047 048 private final static int CAPACITY = 16; 049 050 static class TestObject { 051 private final int priority; 052 private final int seqId; 053 054 public TestObject(final int priority, final int seqId) { 055 this.priority = priority; 056 this.seqId = seqId; 057 } 058 059 public int getSeqId() { 060 return this.seqId; 061 } 062 063 public int getPriority() { 064 return this.priority; 065 } 066 } 067 068 static class TestObjectComparator implements Comparator<TestObject> { 069 public TestObjectComparator() { 070 } 071 072 @Override 073 public int compare(TestObject a, TestObject b) { 074 return a.getPriority() - b.getPriority(); 075 } 076 } 077 078 private BoundedPriorityBlockingQueue<TestObject> queue; 079 080 @Before 081 public void setUp() throws Exception { 082 this.queue = new BoundedPriorityBlockingQueue<>(CAPACITY, new TestObjectComparator()); 083 } 084 085 @After 086 public void tearDown() throws Exception { 087 } 088 089 @Test 090 public void tesAppend() throws Exception { 091 // Push 092 for (int i = 1; i <= CAPACITY; ++i) { 093 assertTrue(queue.offer(new TestObject(i, i))); 094 assertEquals(i, queue.size()); 095 assertEquals(CAPACITY - i, queue.remainingCapacity()); 096 } 097 assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS)); 098 099 // Pop 100 for (int i = 1; i <= CAPACITY; ++i) { 101 TestObject obj = queue.poll(); 102 assertEquals(i, obj.getSeqId()); 103 assertEquals(CAPACITY - i, queue.size()); 104 assertEquals(i, queue.remainingCapacity()); 105 } 106 assertEquals(null, queue.poll()); 107 } 108 109 @Test 110 public void tesAppendSamePriority() throws Exception { 111 // Push 112 for (int i = 1; i <= CAPACITY; ++i) { 113 assertTrue(queue.offer(new TestObject(0, i))); 114 assertEquals(i, queue.size()); 115 assertEquals(CAPACITY - i, queue.remainingCapacity()); 116 } 117 assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS)); 118 119 // Pop 120 for (int i = 1; i <= CAPACITY; ++i) { 121 TestObject obj = queue.poll(); 122 assertEquals(i, obj.getSeqId()); 123 assertEquals(CAPACITY - i, queue.size()); 124 assertEquals(i, queue.remainingCapacity()); 125 } 126 assertEquals(null, queue.poll()); 127 } 128 129 @Test 130 public void testPrepend() throws Exception { 131 // Push 132 for (int i = 1; i <= CAPACITY; ++i) { 133 assertTrue(queue.offer(new TestObject(CAPACITY - i, i))); 134 assertEquals(i, queue.size()); 135 assertEquals(CAPACITY - i, queue.remainingCapacity()); 136 } 137 138 // Pop 139 for (int i = 1; i <= CAPACITY; ++i) { 140 TestObject obj = queue.poll(); 141 assertEquals(CAPACITY - (i - 1), obj.getSeqId()); 142 assertEquals(CAPACITY - i, queue.size()); 143 assertEquals(i, queue.remainingCapacity()); 144 } 145 assertEquals(null, queue.poll()); 146 } 147 148 @Test 149 public void testInsert() throws Exception { 150 // Push 151 for (int i = 1; i <= CAPACITY; i += 2) { 152 assertTrue(queue.offer(new TestObject(i, i))); 153 assertEquals((1 + i) / 2, queue.size()); 154 } 155 for (int i = 2; i <= CAPACITY; i += 2) { 156 assertTrue(queue.offer(new TestObject(i, i))); 157 assertEquals(CAPACITY / 2 + (i / 2), queue.size()); 158 } 159 assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS)); 160 161 // Pop 162 for (int i = 1; i <= CAPACITY; ++i) { 163 TestObject obj = queue.poll(); 164 assertEquals(i, obj.getSeqId()); 165 assertEquals(CAPACITY - i, queue.size()); 166 assertEquals(i, queue.remainingCapacity()); 167 } 168 assertEquals(null, queue.poll()); 169 } 170 171 @Test 172 public void testFifoSamePriority() throws Exception { 173 assertTrue(CAPACITY >= 6); 174 for (int i = 0; i < 6; ++i) { 175 assertTrue(queue.offer(new TestObject((1 + (i % 2)) * 10, i))); 176 } 177 178 for (int i = 0; i < 6; i += 2) { 179 TestObject obj = queue.poll(); 180 assertEquals(10, obj.getPriority()); 181 assertEquals(i, obj.getSeqId()); 182 } 183 184 for (int i = 1; i < 6; i += 2) { 185 TestObject obj = queue.poll(); 186 assertEquals(20, obj.getPriority()); 187 assertEquals(i, obj.getSeqId()); 188 } 189 assertEquals(null, queue.poll()); 190 } 191 192 @Test 193 public void testPoll() { 194 assertNull(queue.poll()); 195 PriorityQueue<TestObject> testList = new PriorityQueue<>(CAPACITY, new TestObjectComparator()); 196 197 for (int i = 0; i < CAPACITY; ++i) { 198 TestObject obj = new TestObject(i, i); 199 testList.add(obj); 200 queue.offer(obj); 201 } 202 203 for (int i = 0; i < CAPACITY; ++i) { 204 assertEquals(testList.poll(), queue.poll()); 205 } 206 207 assertNull(null, queue.poll()); 208 } 209 210 @Test 211 public void testPollInExecutor() throws InterruptedException { 212 final TestObject testObj = new TestObject(0, 0); 213 214 final CyclicBarrier threadsStarted = new CyclicBarrier(2); 215 ExecutorService executor = Executors.newFixedThreadPool(2); 216 executor.execute(new Runnable() { 217 @Override 218 public void run() { 219 try { 220 assertNull(queue.poll(1000, TimeUnit.MILLISECONDS)); 221 threadsStarted.await(); 222 assertSame(testObj, queue.poll(1000, TimeUnit.MILLISECONDS)); 223 assertTrue(queue.isEmpty()); 224 } catch (Exception e) { 225 throw new RuntimeException(e); 226 } 227 } 228 }); 229 230 executor.execute(new Runnable() { 231 @Override 232 public void run() { 233 try { 234 threadsStarted.await(); 235 queue.offer(testObj); 236 } catch (Exception e) { 237 throw new RuntimeException(e); 238 } 239 } 240 }); 241 242 executor.shutdown(); 243 assertTrue(executor.awaitTermination(8000, TimeUnit.MILLISECONDS)); 244 } 245}