001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.*; 021 022import java.io.IOException; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.List; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellUtil; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.testclassification.ClientTests; 032import org.apache.hadoop.hbase.testclassification.LargeTests; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.junit.After; 035import org.junit.AfterClass; 036import org.junit.Before; 037import org.junit.BeforeClass; 038import org.junit.ClassRule; 039import org.junit.Rule; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042import org.junit.rules.TestName; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Run tests related to {@link org.apache.hadoop.hbase.filter.TimestampsFilter} using HBase client 048 * APIs. Sets up the HBase mini cluster once at start. Each creates a table named for the method and 049 * does its stuff against that. 050 */ 051@Category({ LargeTests.class, ClientTests.class }) 052public class TestMultipleTimestamps { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestMultipleTimestamps.class); 057 058 private static final Logger LOG = LoggerFactory.getLogger(TestMultipleTimestamps.class); 059 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 060 061 @Rule 062 public TestName name = new TestName(); 063 064 /** 065 * @throws java.lang.Exception 066 */ 067 @BeforeClass 068 public static void setUpBeforeClass() throws Exception { 069 TEST_UTIL.startMiniCluster(); 070 } 071 072 /** 073 * @throws java.lang.Exception 074 */ 075 @AfterClass 076 public static void tearDownAfterClass() throws Exception { 077 TEST_UTIL.shutdownMiniCluster(); 078 } 079 080 /** 081 * @throws java.lang.Exception 082 */ 083 @Before 084 public void setUp() throws Exception { 085 // Nothing to do. 086 } 087 088 /** 089 * @throws java.lang.Exception 090 */ 091 @After 092 public void tearDown() throws Exception { 093 // Nothing to do. 094 } 095 096 @Test 097 public void testReseeksWithOneColumnMiltipleTimestamp() throws IOException { 098 final TableName tableName = TableName.valueOf(name.getMethodName()); 099 byte[] FAMILY = Bytes.toBytes("event_log"); 100 byte[][] FAMILIES = new byte[][] { FAMILY }; 101 102 // create table; set versions to max... 103 Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE); 104 105 Integer[] putRows = new Integer[] { 1, 3, 5, 7 }; 106 Integer[] putColumns = new Integer[] { 1, 3, 5 }; 107 Long[] putTimestamps = new Long[] { 1L, 2L, 3L, 4L, 5L }; 108 109 Integer[] scanRows = new Integer[] { 3, 5 }; 110 Integer[] scanColumns = new Integer[] { 3 }; 111 Long[] scanTimestamps = new Long[] { 3L, 4L }; 112 int scanMaxVersions = 2; 113 114 put(ht, FAMILY, putRows, putColumns, putTimestamps); 115 116 TEST_UTIL.flush(tableName); 117 118 ResultScanner scanner = 119 scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions); 120 121 Cell[] kvs; 122 123 kvs = scanner.next().rawCells(); 124 assertEquals(2, kvs.length); 125 checkOneCell(kvs[0], FAMILY, 3, 3, 4); 126 checkOneCell(kvs[1], FAMILY, 3, 3, 3); 127 kvs = scanner.next().rawCells(); 128 assertEquals(2, kvs.length); 129 checkOneCell(kvs[0], FAMILY, 5, 3, 4); 130 checkOneCell(kvs[1], FAMILY, 5, 3, 3); 131 132 ht.close(); 133 } 134 135 @Test 136 public void testReseeksWithMultipleColumnOneTimestamp() throws IOException { 137 LOG.info(name.getMethodName()); 138 final TableName tableName = TableName.valueOf(name.getMethodName()); 139 byte[] FAMILY = Bytes.toBytes("event_log"); 140 byte[][] FAMILIES = new byte[][] { FAMILY }; 141 142 // create table; set versions to max... 143 Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE); 144 145 Integer[] putRows = new Integer[] { 1, 3, 5, 7 }; 146 Integer[] putColumns = new Integer[] { 1, 3, 5 }; 147 Long[] putTimestamps = new Long[] { 1L, 2L, 3L, 4L, 5L }; 148 149 Integer[] scanRows = new Integer[] { 3, 5 }; 150 Integer[] scanColumns = new Integer[] { 3, 4 }; 151 Long[] scanTimestamps = new Long[] { 3L }; 152 int scanMaxVersions = 2; 153 154 put(ht, FAMILY, putRows, putColumns, putTimestamps); 155 156 TEST_UTIL.flush(tableName); 157 158 ResultScanner scanner = 159 scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions); 160 161 Cell[] kvs; 162 163 kvs = scanner.next().rawCells(); 164 assertEquals(1, kvs.length); 165 checkOneCell(kvs[0], FAMILY, 3, 3, 3); 166 kvs = scanner.next().rawCells(); 167 assertEquals(1, kvs.length); 168 checkOneCell(kvs[0], FAMILY, 5, 3, 3); 169 170 ht.close(); 171 } 172 173 @Test 174 public void testReseeksWithMultipleColumnMultipleTimestamp() throws IOException { 175 LOG.info(name.getMethodName()); 176 177 final TableName tableName = TableName.valueOf(name.getMethodName()); 178 byte[] FAMILY = Bytes.toBytes("event_log"); 179 byte[][] FAMILIES = new byte[][] { FAMILY }; 180 181 // create table; set versions to max... 182 Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE); 183 184 Integer[] putRows = new Integer[] { 1, 3, 5, 7 }; 185 Integer[] putColumns = new Integer[] { 1, 3, 5 }; 186 Long[] putTimestamps = new Long[] { 1L, 2L, 3L, 4L, 5L }; 187 188 Integer[] scanRows = new Integer[] { 5, 7 }; 189 Integer[] scanColumns = new Integer[] { 3, 4, 5 }; 190 Long[] scanTimestamps = new Long[] { 2L, 3L }; 191 int scanMaxVersions = 2; 192 193 put(ht, FAMILY, putRows, putColumns, putTimestamps); 194 195 TEST_UTIL.flush(tableName); 196 Scan scan = new Scan(); 197 scan.readVersions(10); 198 ResultScanner scanner = ht.getScanner(scan); 199 while (true) { 200 Result r = scanner.next(); 201 if (r == null) break; 202 LOG.info("r=" + r); 203 } 204 scanner = scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions); 205 206 Cell[] kvs; 207 208 // This looks like wrong answer. Should be 2. Even then we are returning wrong result, 209 // timestamps that are 3 whereas should be 2 since min is inclusive. 210 kvs = scanner.next().rawCells(); 211 assertEquals(4, kvs.length); 212 checkOneCell(kvs[0], FAMILY, 5, 3, 3); 213 checkOneCell(kvs[1], FAMILY, 5, 3, 2); 214 checkOneCell(kvs[2], FAMILY, 5, 5, 3); 215 checkOneCell(kvs[3], FAMILY, 5, 5, 2); 216 kvs = scanner.next().rawCells(); 217 assertEquals(4, kvs.length); 218 checkOneCell(kvs[0], FAMILY, 7, 3, 3); 219 checkOneCell(kvs[1], FAMILY, 7, 3, 2); 220 checkOneCell(kvs[2], FAMILY, 7, 5, 3); 221 checkOneCell(kvs[3], FAMILY, 7, 5, 2); 222 223 ht.close(); 224 } 225 226 @Test 227 public void testReseeksWithMultipleFiles() throws IOException { 228 LOG.info(name.getMethodName()); 229 final TableName tableName = TableName.valueOf(name.getMethodName()); 230 byte[] FAMILY = Bytes.toBytes("event_log"); 231 byte[][] FAMILIES = new byte[][] { FAMILY }; 232 233 // create table; set versions to max... 234 Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE); 235 236 Integer[] putRows1 = new Integer[] { 1, 2, 3 }; 237 Integer[] putColumns1 = new Integer[] { 2, 5, 6 }; 238 Long[] putTimestamps1 = new Long[] { 1L, 2L, 5L }; 239 240 Integer[] putRows2 = new Integer[] { 6, 7 }; 241 Integer[] putColumns2 = new Integer[] { 3, 6 }; 242 Long[] putTimestamps2 = new Long[] { 4L, 5L }; 243 244 Integer[] putRows3 = new Integer[] { 2, 3, 5 }; 245 Integer[] putColumns3 = new Integer[] { 1, 2, 3 }; 246 Long[] putTimestamps3 = new Long[] { 4L, 8L }; 247 248 Integer[] scanRows = new Integer[] { 3, 5, 7 }; 249 Integer[] scanColumns = new Integer[] { 3, 4, 5 }; 250 Long[] scanTimestamps = new Long[] { 2L, 4L }; 251 int scanMaxVersions = 5; 252 253 put(ht, FAMILY, putRows1, putColumns1, putTimestamps1); 254 TEST_UTIL.flush(tableName); 255 put(ht, FAMILY, putRows2, putColumns2, putTimestamps2); 256 TEST_UTIL.flush(tableName); 257 put(ht, FAMILY, putRows3, putColumns3, putTimestamps3); 258 259 ResultScanner scanner = 260 scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions); 261 262 Cell[] kvs; 263 264 kvs = scanner.next().rawCells(); 265 assertEquals(2, kvs.length); 266 checkOneCell(kvs[0], FAMILY, 3, 3, 4); 267 checkOneCell(kvs[1], FAMILY, 3, 5, 2); 268 269 kvs = scanner.next().rawCells(); 270 assertEquals(1, kvs.length); 271 checkOneCell(kvs[0], FAMILY, 5, 3, 4); 272 273 kvs = scanner.next().rawCells(); 274 assertEquals(1, kvs.length); 275 checkOneCell(kvs[0], FAMILY, 6, 3, 4); 276 277 kvs = scanner.next().rawCells(); 278 assertEquals(1, kvs.length); 279 checkOneCell(kvs[0], FAMILY, 7, 3, 4); 280 281 ht.close(); 282 } 283 284 @Test 285 public void testWithVersionDeletes() throws Exception { 286 287 // first test from memstore (without flushing). 288 testWithVersionDeletes(false); 289 290 // run same test against HFiles (by forcing a flush). 291 testWithVersionDeletes(true); 292 } 293 294 public void testWithVersionDeletes(boolean flushTables) throws IOException { 295 LOG.info(name.getMethodName() + "_" + (flushTables ? "flush" : "noflush")); 296 final TableName tableName = 297 TableName.valueOf(name.getMethodName() + "_" + (flushTables ? "flush" : "noflush")); 298 byte[] FAMILY = Bytes.toBytes("event_log"); 299 byte[][] FAMILIES = new byte[][] { FAMILY }; 300 301 // create table; set versions to max... 302 Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE); 303 304 // For row:0, col:0: insert versions 1 through 5. 305 putNVersions(ht, FAMILY, 0, 0, 1, 5); 306 307 if (flushTables) { 308 TEST_UTIL.flush(tableName); 309 } 310 311 // delete version 4. 312 deleteOneVersion(ht, FAMILY, 0, 0, 4); 313 314 // request a bunch of versions including the deleted version. We should 315 // only get back entries for the versions that exist. 316 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L)); 317 assertEquals(3, kvs.length); 318 checkOneCell(kvs[0], FAMILY, 0, 0, 5); 319 checkOneCell(kvs[1], FAMILY, 0, 0, 3); 320 checkOneCell(kvs[2], FAMILY, 0, 0, 2); 321 322 ht.close(); 323 } 324 325 @Test 326 public void testWithMultipleVersionDeletes() throws IOException { 327 LOG.info(name.getMethodName()); 328 329 final TableName tableName = TableName.valueOf(name.getMethodName()); 330 byte[] FAMILY = Bytes.toBytes("event_log"); 331 byte[][] FAMILIES = new byte[][] { FAMILY }; 332 333 // create table; set versions to max... 334 Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE); 335 336 // For row:0, col:0: insert versions 1 through 5. 337 putNVersions(ht, FAMILY, 0, 0, 1, 5); 338 339 TEST_UTIL.flush(tableName); 340 341 // delete all versions before 4. 342 deleteAllVersionsBefore(ht, FAMILY, 0, 0, 4); 343 344 // request a bunch of versions including the deleted version. We should 345 // only get back entries for the versions that exist. 346 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L)); 347 assertEquals(0, kvs.length); 348 349 ht.close(); 350 } 351 352 @Test 353 public void testWithColumnDeletes() throws IOException { 354 final TableName tableName = TableName.valueOf(name.getMethodName()); 355 byte[] FAMILY = Bytes.toBytes("event_log"); 356 byte[][] FAMILIES = new byte[][] { FAMILY }; 357 358 // create table; set versions to max... 359 Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE); 360 361 // For row:0, col:0: insert versions 1 through 5. 362 putNVersions(ht, FAMILY, 0, 0, 1, 5); 363 364 TEST_UTIL.flush(tableName); 365 366 // delete all versions before 4. 367 deleteColumn(ht, FAMILY, 0, 0); 368 369 // request a bunch of versions including the deleted version. We should 370 // only get back entries for the versions that exist. 371 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L)); 372 assertEquals(0, kvs.length); 373 374 ht.close(); 375 } 376 377 @Test 378 public void testWithFamilyDeletes() throws IOException { 379 final TableName tableName = TableName.valueOf(name.getMethodName()); 380 byte[] FAMILY = Bytes.toBytes("event_log"); 381 byte[][] FAMILIES = new byte[][] { FAMILY }; 382 383 // create table; set versions to max... 384 Table ht = TEST_UTIL.createTable(tableName, FAMILIES, Integer.MAX_VALUE); 385 386 // For row:0, col:0: insert versions 1 through 5. 387 putNVersions(ht, FAMILY, 0, 0, 1, 5); 388 389 TEST_UTIL.flush(tableName); 390 391 // delete all versions before 4. 392 deleteFamily(ht, FAMILY, 0); 393 394 // request a bunch of versions including the deleted version. We should 395 // only get back entries for the versions that exist. 396 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L)); 397 assertEquals(0, kvs.length); 398 399 ht.close(); 400 } 401 402 /** 403 * Assert that the passed in KeyValue has expected contents for the specified row, column & 404 * timestamp. 405 */ 406 private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) { 407 408 String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts; 409 410 assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx, 411 Bytes.toString(CellUtil.cloneRow(kv))); 412 413 assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf), 414 Bytes.toString(CellUtil.cloneFamily(kv))); 415 416 assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx, 417 Bytes.toString(CellUtil.cloneQualifier(kv))); 418 419 assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp()); 420 421 assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts, 422 Bytes.toString(CellUtil.cloneValue(kv))); 423 } 424 425 /** 426 * Uses the TimestampFilter on a Get to request a specified list of versions for the row/column 427 * specified by rowIdx & colIdx. 428 */ 429 private Cell[] getNVersions(Table ht, byte[] cf, int rowIdx, int colIdx, List<Long> versions) 430 throws IOException { 431 byte row[] = Bytes.toBytes("row:" + rowIdx); 432 byte column[] = Bytes.toBytes("column:" + colIdx); 433 Get get = new Get(row); 434 get.addColumn(cf, column); 435 get.readAllVersions(); 436 get.setTimeRange(Collections.min(versions), Collections.max(versions) + 1); 437 Result result = ht.get(get); 438 439 return result.rawCells(); 440 } 441 442 private ResultScanner scan(Table ht, byte[] cf, Integer[] rowIndexes, Integer[] columnIndexes, 443 Long[] versions, int maxVersions) throws IOException { 444 byte startRow[] = Bytes.toBytes("row:" + Collections.min(Arrays.asList(rowIndexes))); 445 byte endRow[] = Bytes.toBytes("row:" + Collections.max(Arrays.asList(rowIndexes)) + 1); 446 Scan scan = new Scan().withStartRow(startRow).withStopRow(endRow); 447 for (Integer colIdx : columnIndexes) { 448 byte column[] = Bytes.toBytes("column:" + colIdx); 449 scan.addColumn(cf, column); 450 } 451 scan.readVersions(maxVersions); 452 scan.setTimeRange(Collections.min(Arrays.asList(versions)), 453 Collections.max(Arrays.asList(versions)) + 1); 454 ResultScanner scanner = ht.getScanner(scan); 455 return scanner; 456 } 457 458 private void put(Table ht, byte[] cf, Integer[] rowIndexes, Integer[] columnIndexes, 459 Long[] versions) throws IOException { 460 for (int rowIdx : rowIndexes) { 461 byte row[] = Bytes.toBytes("row:" + rowIdx); 462 Put put = new Put(row); 463 put.setDurability(Durability.SKIP_WAL); 464 for (int colIdx : columnIndexes) { 465 byte column[] = Bytes.toBytes("column:" + colIdx); 466 for (long version : versions) { 467 put.addColumn(cf, column, version, Bytes.toBytes("value-version-" + version)); 468 } 469 } 470 ht.put(put); 471 } 472 } 473 474 /** 475 * Insert in specific row/column versions with timestamps versionStart..versionEnd. 476 */ 477 private void putNVersions(Table ht, byte[] cf, int rowIdx, int colIdx, long versionStart, 478 long versionEnd) throws IOException { 479 byte row[] = Bytes.toBytes("row:" + rowIdx); 480 byte column[] = Bytes.toBytes("column:" + colIdx); 481 Put put = new Put(row); 482 put.setDurability(Durability.SKIP_WAL); 483 484 for (long idx = versionStart; idx <= versionEnd; idx++) { 485 put.addColumn(cf, column, idx, Bytes.toBytes("value-version-" + idx)); 486 } 487 488 ht.put(put); 489 } 490 491 /** 492 * For row/column specified by rowIdx/colIdx, delete the cell corresponding to the specified 493 * version. 494 */ 495 private void deleteOneVersion(Table ht, byte[] cf, int rowIdx, int colIdx, long version) 496 throws IOException { 497 byte row[] = Bytes.toBytes("row:" + rowIdx); 498 byte column[] = Bytes.toBytes("column:" + colIdx); 499 Delete del = new Delete(row); 500 del.addColumn(cf, column, version); 501 ht.delete(del); 502 } 503 504 /** 505 * For row/column specified by rowIdx/colIdx, delete all cells preceeding the specified version. 506 */ 507 private void deleteAllVersionsBefore(Table ht, byte[] cf, int rowIdx, int colIdx, long version) 508 throws IOException { 509 byte row[] = Bytes.toBytes("row:" + rowIdx); 510 byte column[] = Bytes.toBytes("column:" + colIdx); 511 Delete del = new Delete(row); 512 del.addColumns(cf, column, version); 513 ht.delete(del); 514 } 515 516 private void deleteColumn(Table ht, byte[] cf, int rowIdx, int colIdx) throws IOException { 517 byte row[] = Bytes.toBytes("row:" + rowIdx); 518 byte column[] = Bytes.toBytes("column:" + colIdx); 519 Delete del = new Delete(row); 520 del.addColumns(cf, column); 521 ht.delete(del); 522 } 523 524 private void deleteFamily(Table ht, byte[] cf, int rowIdx) throws IOException { 525 byte row[] = Bytes.toBytes("row:" + rowIdx); 526 Delete del = new Delete(row); 527 del.addFamily(cf); 528 ht.delete(del); 529 } 530 531}