001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertThrows; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.List; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellComparatorImpl; 030import org.apache.hadoop.hbase.ExtendedCell; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.testclassification.RegionServerTests; 034import org.apache.hadoop.hbase.testclassification.SmallTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.CollectionBackedScanner; 037import org.junit.ClassRule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040 041@Category({ RegionServerTests.class, SmallTests.class }) 042public class TestKeyValueHeap { 043 044 @ClassRule 045 public static final HBaseClassTestRule CLASS_RULE = 046 HBaseClassTestRule.forClass(TestKeyValueHeap.class); 047 048 private byte[] row1 = Bytes.toBytes("row1"); 049 private byte[] fam1 = Bytes.toBytes("fam1"); 050 private byte[] col1 = Bytes.toBytes("col1"); 051 private byte[] data = Bytes.toBytes("data"); 052 053 private byte[] row2 = Bytes.toBytes("row2"); 054 private byte[] fam2 = Bytes.toBytes("fam2"); 055 private byte[] col2 = Bytes.toBytes("col2"); 056 057 private byte[] col3 = Bytes.toBytes("col3"); 058 private byte[] col4 = Bytes.toBytes("col4"); 059 private byte[] col5 = Bytes.toBytes("col5"); 060 061 // Variable name encoding. kv<row#><fam#><col#> 062 ExtendedCell kv111 = new KeyValue(row1, fam1, col1, data); 063 ExtendedCell kv112 = new KeyValue(row1, fam1, col2, data); 064 ExtendedCell kv113 = new KeyValue(row1, fam1, col3, data); 065 ExtendedCell kv114 = new KeyValue(row1, fam1, col4, data); 066 ExtendedCell kv115 = new KeyValue(row1, fam1, col5, data); 067 ExtendedCell kv121 = new KeyValue(row1, fam2, col1, data); 068 ExtendedCell kv122 = new KeyValue(row1, fam2, col2, data); 069 ExtendedCell kv211 = new KeyValue(row2, fam1, col1, data); 070 ExtendedCell kv212 = new KeyValue(row2, fam1, col2, data); 071 ExtendedCell kv213 = new KeyValue(row2, fam1, col3, data); 072 073 TestScanner s1 = new TestScanner(Arrays.asList(kv115, kv211, kv212)); 074 TestScanner s2 = new TestScanner(Arrays.asList(kv111, kv112)); 075 TestScanner s3 = new TestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213)); 076 077 List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3)); 078 079 /* 080 * Uses {@code scanners} to build a KeyValueHeap, iterates over it and asserts that returned Cells 081 * are same as {@code expected}. 082 * @return List of Cells returned from scanners. 083 */ 084 public List<Cell> assertCells(List<Cell> expected, List<KeyValueScanner> scanners) 085 throws IOException { 086 // Creating KeyValueHeap 087 try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) { 088 List<Cell> actual = new ArrayList<>(); 089 while (kvh.peek() != null) { 090 actual.add(kvh.next()); 091 } 092 093 assertEquals(expected, actual); 094 return actual; 095 } 096 } 097 098 @Test 099 public void testSorted() throws IOException { 100 // Cases that need to be checked are: 101 // 1. The "smallest" Cell is in the same scanners as current 102 // 2. Current scanner gets empty 103 104 List<Cell> expected = 105 Arrays.asList(kv111, kv112, kv113, kv114, kv115, kv121, kv122, kv211, kv212, kv213); 106 107 List<Cell> actual = assertCells(expected, scanners); 108 109 // Check if result is sorted according to Comparator 110 for (int i = 0; i < actual.size() - 1; i++) { 111 int ret = CellComparatorImpl.COMPARATOR.compare(actual.get(i), actual.get(i + 1)); 112 assertTrue(ret < 0); 113 } 114 } 115 116 @Test 117 public void testSeek() throws IOException { 118 // Cases: 119 // 1. Seek Cell that is not in scanner 120 // 2. Check that smallest that is returned from a seek is correct 121 List<Cell> expected = Arrays.asList(kv211); 122 123 // Creating KeyValueHeap 124 try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) { 125 ExtendedCell seekKv = new KeyValue(row2, fam1, null, null); 126 kvh.seek(seekKv); 127 128 List<Cell> actual = Arrays.asList(kvh.peek()); 129 130 assertEquals("Expected = " + Arrays.toString(expected.toArray()) + "\n Actual = " 131 + Arrays.toString(actual.toArray()), expected, actual); 132 } 133 } 134 135 @Test 136 public void testScannerLeak() throws IOException { 137 // Test for unclosed scanners (HBASE-1927) 138 139 TestScanner s4 = new TestScanner(new ArrayList<>()); 140 scanners.add(s4); 141 142 // Creating KeyValueHeap 143 try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) { 144 for (;;) { 145 if (kvh.next() == null) { 146 break; 147 } 148 } 149 // Once the internal scanners go out of Cells, those will be removed from KVHeap's priority 150 // queue and added to a Set for lazy close. The actual close will happen only on 151 // KVHeap#close() 152 assertEquals(4, kvh.scannersForDelayedClose.size()); 153 assertTrue(kvh.scannersForDelayedClose.contains(s1)); 154 assertTrue(kvh.scannersForDelayedClose.contains(s2)); 155 assertTrue(kvh.scannersForDelayedClose.contains(s3)); 156 assertTrue(kvh.scannersForDelayedClose.contains(s4)); 157 } 158 159 for (KeyValueScanner scanner : scanners) { 160 assertTrue(((TestScanner) scanner).isClosed()); 161 } 162 } 163 164 @Test 165 public void testScannerException() throws IOException { 166 // Test for NPE issue when exception happens in scanners (HBASE-13835) 167 168 TestScanner s1 = new SeekTestScanner(Arrays.asList(kv115, kv211, kv212)); 169 TestScanner s2 = new SeekTestScanner(Arrays.asList(kv111, kv112)); 170 TestScanner s3 = new SeekTestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213)); 171 TestScanner s4 = new SeekTestScanner(new ArrayList<>()); 172 173 List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3, s4)); 174 175 // Creating KeyValueHeap 176 try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) { 177 for (KeyValueScanner scanner : scanners) { 178 ((SeekTestScanner) scanner).setRealSeekDone(false); 179 } 180 // The pollRealKV should throw IOE. 181 assertThrows(IOException.class, () -> { 182 for (;;) { 183 if (kvh.next() == null) { 184 break; 185 } 186 } 187 }); 188 } 189 // It implies there is no NPE thrown from kvh.close() if getting here 190 for (KeyValueScanner scanner : scanners) { 191 // Verify that close is called and only called once for each scanner 192 assertTrue(((SeekTestScanner) scanner).isClosed()); 193 assertEquals(1, ((SeekTestScanner) scanner).getClosedNum()); 194 } 195 } 196 197 @Test 198 public void testPriorityId() throws IOException { 199 ExtendedCell kv113A = new KeyValue(row1, fam1, col3, Bytes.toBytes("aaa")); 200 ExtendedCell kv113B = new KeyValue(row1, fam1, col3, Bytes.toBytes("bbb")); 201 TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 1); 202 TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 2); 203 List<Cell> expected = Arrays.asList(kv111, kv112, kv113B, kv113A); 204 assertCells(expected, Arrays.asList(scan1, scan2)); 205 206 scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 2); 207 scan2 = new TestScanner(Arrays.asList(kv113B), 1); 208 expected = Arrays.asList(kv111, kv112, kv113A, kv113B); 209 assertCells(expected, Arrays.asList(scan1, scan2)); 210 } 211 212 private static class TestScanner extends CollectionBackedScanner { 213 private boolean closed = false; 214 private long scannerOrder = 0; 215 216 public TestScanner(List<ExtendedCell> list) { 217 super(list); 218 } 219 220 public TestScanner(List<ExtendedCell> list, long scannerOrder) { 221 this(list); 222 this.scannerOrder = scannerOrder; 223 } 224 225 @Override 226 public long getScannerOrder() { 227 return scannerOrder; 228 } 229 230 @Override 231 public void close() { 232 closed = true; 233 } 234 235 public boolean isClosed() { 236 return closed; 237 } 238 } 239 240 private static class SeekTestScanner extends TestScanner { 241 private int closedNum = 0; 242 private boolean realSeekDone = true; 243 244 public SeekTestScanner(List<ExtendedCell> list) { 245 super(list); 246 } 247 248 @Override 249 public void close() { 250 super.close(); 251 closedNum++; 252 } 253 254 public int getClosedNum() { 255 return closedNum; 256 } 257 258 @Override 259 public boolean realSeekDone() { 260 return realSeekDone; 261 } 262 263 public void setRealSeekDone(boolean done) { 264 realSeekDone = done; 265 } 266 267 @Override 268 public void enforceSeek() throws IOException { 269 throw new IOException("enforceSeek must not be called on a " + "non-lazy scanner"); 270 } 271 } 272}