001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertSame;
024import static org.junit.Assert.assertTrue;
025
026import java.util.Comparator;
027import java.util.PriorityQueue;
028import java.util.concurrent.CyclicBarrier;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.TimeUnit;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.testclassification.MiscTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.junit.After;
036import org.junit.Before;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040
041@Category({ MiscTests.class, SmallTests.class })
042public class TestBoundedPriorityBlockingQueue {
043
044  @ClassRule
045  public static final HBaseClassTestRule CLASS_RULE =
046    HBaseClassTestRule.forClass(TestBoundedPriorityBlockingQueue.class);
047
048  private final static int CAPACITY = 16;
049
050  static class TestObject {
051    private final int priority;
052    private final int seqId;
053
054    public TestObject(final int priority, final int seqId) {
055      this.priority = priority;
056      this.seqId = seqId;
057    }
058
059    public int getSeqId() {
060      return this.seqId;
061    }
062
063    public int getPriority() {
064      return this.priority;
065    }
066  }
067
068  static class TestObjectComparator implements Comparator<TestObject> {
069    public TestObjectComparator() {
070    }
071
072    @Override
073    public int compare(TestObject a, TestObject b) {
074      return a.getPriority() - b.getPriority();
075    }
076  }
077
078  private BoundedPriorityBlockingQueue<TestObject> queue;
079
080  @Before
081  public void setUp() throws Exception {
082    this.queue = new BoundedPriorityBlockingQueue<>(CAPACITY, new TestObjectComparator());
083  }
084
085  @After
086  public void tearDown() throws Exception {
087  }
088
089  @Test
090  public void tesAppend() throws Exception {
091    // Push
092    for (int i = 1; i <= CAPACITY; ++i) {
093      assertTrue(queue.offer(new TestObject(i, i)));
094      assertEquals(i, queue.size());
095      assertEquals(CAPACITY - i, queue.remainingCapacity());
096    }
097    assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
098
099    // Pop
100    for (int i = 1; i <= CAPACITY; ++i) {
101      TestObject obj = queue.poll();
102      assertEquals(i, obj.getSeqId());
103      assertEquals(CAPACITY - i, queue.size());
104      assertEquals(i, queue.remainingCapacity());
105    }
106    assertEquals(null, queue.poll());
107  }
108
109  @Test
110  public void tesAppendSamePriority() throws Exception {
111    // Push
112    for (int i = 1; i <= CAPACITY; ++i) {
113      assertTrue(queue.offer(new TestObject(0, i)));
114      assertEquals(i, queue.size());
115      assertEquals(CAPACITY - i, queue.remainingCapacity());
116    }
117    assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
118
119    // Pop
120    for (int i = 1; i <= CAPACITY; ++i) {
121      TestObject obj = queue.poll();
122      assertEquals(i, obj.getSeqId());
123      assertEquals(CAPACITY - i, queue.size());
124      assertEquals(i, queue.remainingCapacity());
125    }
126    assertEquals(null, queue.poll());
127  }
128
129  @Test
130  public void testPrepend() throws Exception {
131    // Push
132    for (int i = 1; i <= CAPACITY; ++i) {
133      assertTrue(queue.offer(new TestObject(CAPACITY - i, i)));
134      assertEquals(i, queue.size());
135      assertEquals(CAPACITY - i, queue.remainingCapacity());
136    }
137
138    // Pop
139    for (int i = 1; i <= CAPACITY; ++i) {
140      TestObject obj = queue.poll();
141      assertEquals(CAPACITY - (i - 1), obj.getSeqId());
142      assertEquals(CAPACITY - i, queue.size());
143      assertEquals(i, queue.remainingCapacity());
144    }
145    assertEquals(null, queue.poll());
146  }
147
148  @Test
149  public void testInsert() throws Exception {
150    // Push
151    for (int i = 1; i <= CAPACITY; i += 2) {
152      assertTrue(queue.offer(new TestObject(i, i)));
153      assertEquals((1 + i) / 2, queue.size());
154    }
155    for (int i = 2; i <= CAPACITY; i += 2) {
156      assertTrue(queue.offer(new TestObject(i, i)));
157      assertEquals(CAPACITY / 2 + (i / 2), queue.size());
158    }
159    assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
160
161    // Pop
162    for (int i = 1; i <= CAPACITY; ++i) {
163      TestObject obj = queue.poll();
164      assertEquals(i, obj.getSeqId());
165      assertEquals(CAPACITY - i, queue.size());
166      assertEquals(i, queue.remainingCapacity());
167    }
168    assertEquals(null, queue.poll());
169  }
170
171  @Test
172  public void testFifoSamePriority() throws Exception {
173    assertTrue(CAPACITY >= 6);
174    for (int i = 0; i < 6; ++i) {
175      assertTrue(queue.offer(new TestObject((1 + (i % 2)) * 10, i)));
176    }
177
178    for (int i = 0; i < 6; i += 2) {
179      TestObject obj = queue.poll();
180      assertEquals(10, obj.getPriority());
181      assertEquals(i, obj.getSeqId());
182    }
183
184    for (int i = 1; i < 6; i += 2) {
185      TestObject obj = queue.poll();
186      assertEquals(20, obj.getPriority());
187      assertEquals(i, obj.getSeqId());
188    }
189    assertEquals(null, queue.poll());
190  }
191
192  @Test
193  public void testPoll() {
194    assertNull(queue.poll());
195    PriorityQueue<TestObject> testList = new PriorityQueue<>(CAPACITY, new TestObjectComparator());
196
197    for (int i = 0; i < CAPACITY; ++i) {
198      TestObject obj = new TestObject(i, i);
199      testList.add(obj);
200      queue.offer(obj);
201    }
202
203    for (int i = 0; i < CAPACITY; ++i) {
204      assertEquals(testList.poll(), queue.poll());
205    }
206
207    assertNull(null, queue.poll());
208  }
209
210  @Test
211  public void testPollInExecutor() throws InterruptedException {
212    final TestObject testObj = new TestObject(0, 0);
213
214    final CyclicBarrier threadsStarted = new CyclicBarrier(2);
215    ExecutorService executor = Executors.newFixedThreadPool(2);
216    executor.execute(new Runnable() {
217      @Override
218      public void run() {
219        try {
220          assertNull(queue.poll(1000, TimeUnit.MILLISECONDS));
221          threadsStarted.await();
222          assertSame(testObj, queue.poll(1000, TimeUnit.MILLISECONDS));
223          assertTrue(queue.isEmpty());
224        } catch (Exception e) {
225          throw new RuntimeException(e);
226        }
227      }
228    });
229
230    executor.execute(new Runnable() {
231      @Override
232      public void run() {
233        try {
234          threadsStarted.await();
235          queue.offer(testObj);
236        } catch (Exception e) {
237          throw new RuntimeException(e);
238        }
239      }
240    });
241
242    executor.shutdown();
243    assertTrue(executor.awaitTermination(8000, TimeUnit.MILLISECONDS));
244  }
245}