001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.concurrent.CountDownLatch; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.TableNotFoundException; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.ResultScanner; 041import org.apache.hadoop.hbase.client.Scan; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 044import org.apache.hadoop.hbase.io.hfile.HFile; 045import org.apache.hadoop.hbase.io.hfile.HFileContext; 046import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.testclassification.RegionServerTests; 049import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.junit.AfterClass; 053import org.junit.Assert; 054import org.junit.BeforeClass; 055import org.junit.ClassRule; 056import org.junit.Rule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.junit.rules.TestName; 060 061@Category({ RegionServerTests.class, MediumTests.class }) 062public class TestScannerWithBulkload { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestScannerWithBulkload.class); 067 068 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 069 070 @Rule 071 public TestName name = new TestName(); 072 073 @BeforeClass 074 public static void setUpBeforeClass() throws Exception { 075 TEST_UTIL.startMiniCluster(1); 076 } 077 078 private static void createTable(Admin admin, TableName tableName) throws IOException { 079 TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName); 080 ColumnFamilyDescriptor columnFamilyDescriptor = 081 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("col")).setMaxVersions(3).build(); 082 tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor); 083 admin.createTable(tableDescriptorBuilder.build()); 084 } 085 086 @Test 087 public void testBulkLoad() throws Exception { 088 final TableName tableName = TableName.valueOf(name.getMethodName()); 089 long l = EnvironmentEdgeManager.currentTime(); 090 Admin admin = TEST_UTIL.getAdmin(); 091 createTable(admin, tableName); 092 Scan scan = createScan(); 093 final Table table = init(admin, l, scan, tableName); 094 // use bulkload 095 final Path hfilePath = 096 writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file", false); 097 Configuration conf = TEST_UTIL.getConfiguration(); 098 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); 099 BulkLoadHFiles.create(conf).bulkLoad(tableName, hfilePath); 100 ResultScanner scanner = table.getScanner(scan); 101 Result result = scanner.next(); 102 result = scanAfterBulkLoad(scanner, result, "version2"); 103 Put put0 = new Put(Bytes.toBytes("row1")); 104 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 105 Bytes.toBytes("version3"))); 106 table.put(put0); 107 admin.flush(tableName); 108 scanner = table.getScanner(scan); 109 result = scanner.next(); 110 while (result != null) { 111 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q")); 112 for (Cell _c : cells) { 113 if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()).equals("row1")) { 114 System.out 115 .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())); 116 System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(), 117 _c.getQualifierLength())); 118 System.out 119 .println(Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 120 Assert.assertEquals("version3", 121 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 122 } 123 } 124 result = scanner.next(); 125 } 126 scanner.close(); 127 table.close(); 128 } 129 130 private Result scanAfterBulkLoad(ResultScanner scanner, Result result, String expctedVal) 131 throws IOException { 132 while (result != null) { 133 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q")); 134 for (Cell _c : cells) { 135 if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()).equals("row1")) { 136 System.out 137 .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())); 138 System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(), 139 _c.getQualifierLength())); 140 System.out 141 .println(Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 142 Assert.assertEquals(expctedVal, 143 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 144 } 145 } 146 result = scanner.next(); 147 } 148 return result; 149 } 150 151 // If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file. 152 // Else, we will set BULKLOAD_TIME_KEY. 153 private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile) 154 throws IOException { 155 FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); 156 final Path hfilePath = new Path(hFilePath); 157 fs.mkdirs(hfilePath); 158 Path path = new Path(pathStr); 159 HFile.WriterFactory wf = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration()); 160 Assert.assertNotNull(wf); 161 HFileContext context = new HFileContextBuilder().build(); 162 HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create(); 163 KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 164 Bytes.toBytes("version2")); 165 166 // Set cell seq id to test bulk load native hfiles. 167 if (nativeHFile) { 168 // Set a big seq id. Scan should not look at this seq id in a bulk loaded file. 169 // Scan should only look at the seq id appended at the bulk load time, and not skip 170 // this kv. 171 kv.setSequenceId(9999999); 172 } 173 174 writer.append(kv); 175 176 if (nativeHFile) { 177 // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file. 178 // Scan should only look at the seq id appended at the bulk load time, and not skip its 179 // kv. 180 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999))); 181 } else { 182 writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime())); 183 } 184 writer.close(); 185 return hfilePath; 186 } 187 188 private Table init(Admin admin, long l, Scan scan, TableName tableName) throws Exception { 189 Table table = TEST_UTIL.getConnection().getTable(tableName); 190 Put put0 = new Put(Bytes.toBytes("row1")); 191 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 192 Bytes.toBytes("version0"))); 193 table.put(put0); 194 admin.flush(tableName); 195 Put put1 = new Put(Bytes.toBytes("row2")); 196 put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 197 Bytes.toBytes("version0"))); 198 table.put(put1); 199 admin.flush(tableName); 200 put0 = new Put(Bytes.toBytes("row1")); 201 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 202 Bytes.toBytes("version1"))); 203 table.put(put0); 204 admin.flush(tableName); 205 admin.compact(tableName); 206 207 ResultScanner scanner = table.getScanner(scan); 208 Result result = scanner.next(); 209 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q")); 210 Assert.assertEquals(1, cells.size()); 211 Cell _c = cells.get(0); 212 Assert.assertEquals("version1", 213 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 214 scanner.close(); 215 return table; 216 } 217 218 @Test 219 public void testBulkLoadWithParallelScan() throws Exception { 220 final TableName tableName = TableName.valueOf(name.getMethodName()); 221 final long l = EnvironmentEdgeManager.currentTime(); 222 final Admin admin = TEST_UTIL.getAdmin(); 223 createTable(admin, tableName); 224 Scan scan = createScan(); 225 scan.setCaching(1); 226 final Table table = init(admin, l, scan, tableName); 227 // use bulkload 228 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/", 229 "/temp/testBulkLoadWithParallelScan/col/file", false); 230 Configuration conf = TEST_UTIL.getConfiguration(); 231 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); 232 final BulkLoadHFiles bulkload = BulkLoadHFiles.create(conf); 233 ResultScanner scanner = table.getScanner(scan); 234 Result result = scanner.next(); 235 // Create a scanner and then do bulk load 236 final CountDownLatch latch = new CountDownLatch(1); 237 new Thread() { 238 @Override 239 public void run() { 240 try { 241 Put put1 = new Put(Bytes.toBytes("row5")); 242 put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 243 Bytes.toBytes("version0"))); 244 table.put(put1); 245 bulkload.bulkLoad(tableName, hfilePath); 246 latch.countDown(); 247 } catch (TableNotFoundException e) { 248 } catch (IOException e) { 249 } 250 } 251 }.start(); 252 latch.await(); 253 // By the time we do next() the bulk loaded files are also added to the kv 254 // scanner 255 scanAfterBulkLoad(scanner, result, "version1"); 256 scanner.close(); 257 table.close(); 258 } 259 260 @Test 261 public void testBulkLoadNativeHFile() throws Exception { 262 final TableName tableName = TableName.valueOf(name.getMethodName()); 263 long l = EnvironmentEdgeManager.currentTime(); 264 Admin admin = TEST_UTIL.getAdmin(); 265 createTable(admin, tableName); 266 Scan scan = createScan(); 267 final Table table = init(admin, l, scan, tableName); 268 // use bulkload 269 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/", 270 "/temp/testBulkLoadNativeHFile/col/file", true); 271 Configuration conf = TEST_UTIL.getConfiguration(); 272 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); 273 BulkLoadHFiles.create(conf).bulkLoad(tableName, hfilePath); 274 ResultScanner scanner = table.getScanner(scan); 275 Result result = scanner.next(); 276 // We had 'version0', 'version1' for 'row1,col:q' in the table. 277 // Bulk load added 'version2' scanner should be able to see 'version2' 278 result = scanAfterBulkLoad(scanner, result, "version2"); 279 Put put0 = new Put(Bytes.toBytes("row1")); 280 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, 281 Bytes.toBytes("version3"))); 282 table.put(put0); 283 admin.flush(tableName); 284 scanner = table.getScanner(scan); 285 result = scanner.next(); 286 while (result != null) { 287 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q")); 288 for (Cell _c : cells) { 289 if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()).equals("row1")) { 290 System.out 291 .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())); 292 System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(), 293 _c.getQualifierLength())); 294 System.out 295 .println(Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 296 Assert.assertEquals("version3", 297 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength())); 298 } 299 } 300 result = scanner.next(); 301 } 302 scanner.close(); 303 table.close(); 304 } 305 306 private Scan createScan() { 307 Scan scan = new Scan(); 308 scan.readVersions(3); 309 return scan; 310 } 311 312 @AfterClass 313 public static void tearDownAfterClass() throws Exception { 314 TEST_UTIL.shutdownMiniCluster(); 315 } 316}