001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Map; 030import java.util.Random; 031import java.util.Set; 032import java.util.concurrent.ThreadLocalRandom; 033import java.util.concurrent.atomic.AtomicBoolean; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.FileUtil; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.ExtendedCell; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.PrivateCellUtil; 044import org.apache.hadoop.hbase.Stoppable; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.TableNotFoundException; 047import org.apache.hadoop.hbase.client.Admin; 048import org.apache.hadoop.hbase.client.Connection; 049import org.apache.hadoop.hbase.client.ConnectionFactory; 050import org.apache.hadoop.hbase.client.Get; 051import org.apache.hadoop.hbase.client.RegionInfo; 052import org.apache.hadoop.hbase.client.RegionLocator; 053import org.apache.hadoop.hbase.client.Result; 054import org.apache.hadoop.hbase.client.ResultScanner; 055import org.apache.hadoop.hbase.client.RetriesExhaustedException; 056import org.apache.hadoop.hbase.client.Scan; 057import org.apache.hadoop.hbase.client.Table; 058import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; 059import org.apache.hadoop.hbase.testclassification.LargeTests; 060import org.apache.hadoop.hbase.testclassification.ReplicationTests; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.CommonFSUtils; 063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 064import org.apache.hadoop.hbase.util.HFileTestUtil; 065import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 066import org.junit.AfterClass; 067import org.junit.Assert; 068import org.junit.Before; 069import org.junit.BeforeClass; 070import org.junit.ClassRule; 071import org.junit.Test; 072import org.junit.experimental.categories.Category; 073import org.slf4j.Logger; 074import org.slf4j.LoggerFactory; 075 076import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 077 078import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey; 083 084@Category({ ReplicationTests.class, LargeTests.class }) 085public class TestReplicationSink { 086 087 @ClassRule 088 public static final HBaseClassTestRule CLASS_RULE = 089 HBaseClassTestRule.forClass(TestReplicationSink.class); 090 091 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSink.class); 092 private static final int BATCH_SIZE = 10; 093 094 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 095 096 protected static ReplicationSink SINK; 097 098 protected static final TableName TABLE_NAME1 = TableName.valueOf("table1"); 099 protected static final TableName TABLE_NAME2 = TableName.valueOf("table2"); 100 101 protected static final byte[] FAM_NAME1 = Bytes.toBytes("info1"); 102 protected static final byte[] FAM_NAME2 = Bytes.toBytes("info2"); 103 104 protected static Table table1; 105 protected static Stoppable STOPPABLE = new Stoppable() { 106 final AtomicBoolean stop = new AtomicBoolean(false); 107 108 @Override 109 public boolean isStopped() { 110 return this.stop.get(); 111 } 112 113 @Override 114 public void stop(String why) { 115 LOG.info("STOPPING BECAUSE: " + why); 116 this.stop.set(true); 117 } 118 119 }; 120 121 protected static Table table2; 122 protected static String baseNamespaceDir; 123 protected static String hfileArchiveDir; 124 protected static String replicationClusterId; 125 126 /** 127 * @throws java.lang.Exception 128 */ 129 @BeforeClass 130 public static void setUpBeforeClass() throws Exception { 131 TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider", 132 TestSourceFSConfigurationProvider.class.getCanonicalName()); 133 TEST_UTIL.startMiniCluster(3); 134 RegionServerCoprocessorHost rsCpHost = 135 TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getRegionServerCoprocessorHost(); 136 SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), rsCpHost); 137 table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1); 138 table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2); 139 Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()); 140 baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR)).toString(); 141 hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY)).toString(); 142 replicationClusterId = "12345"; 143 } 144 145 /** 146 * @throws java.lang.Exception 147 */ 148 @AfterClass 149 public static void tearDownAfterClass() throws Exception { 150 STOPPABLE.stop("Shutting down"); 151 TEST_UTIL.shutdownMiniCluster(); 152 } 153 154 /** 155 * @throws java.lang.Exception 156 */ 157 @Before 158 public void setUp() throws Exception { 159 table1 = TEST_UTIL.deleteTableData(TABLE_NAME1); 160 table2 = TEST_UTIL.deleteTableData(TABLE_NAME2); 161 } 162 163 /** 164 * Insert a whole batch of entries 165 */ 166 @Test 167 public void testBatchSink() throws Exception { 168 List<WALEntry> entries = new ArrayList<>(BATCH_SIZE); 169 List<ExtendedCell> cells = new ArrayList<>(); 170 for (int i = 0; i < BATCH_SIZE; i++) { 171 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); 172 } 173 SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), 174 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 175 Scan scan = new Scan(); 176 ResultScanner scanRes = table1.getScanner(scan); 177 assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length); 178 } 179 180 /** 181 * Insert a mix of puts and deletes 182 */ 183 @Test 184 public void testMixedPutDelete() throws Exception { 185 List<WALEntry> entries = new ArrayList<>(BATCH_SIZE / 2); 186 List<ExtendedCell> cells = new ArrayList<>(); 187 for (int i = 0; i < BATCH_SIZE / 2; i++) { 188 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); 189 } 190 SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells), 191 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 192 193 entries = new ArrayList<>(BATCH_SIZE); 194 cells = new ArrayList<>(); 195 for (int i = 0; i < BATCH_SIZE; i++) { 196 entries.add(createEntry(TABLE_NAME1, i, 197 i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, cells)); 198 } 199 200 SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), 201 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 202 Scan scan = new Scan(); 203 ResultScanner scanRes = table1.getScanner(scan); 204 assertEquals(BATCH_SIZE / 2, scanRes.next(BATCH_SIZE).length); 205 } 206 207 @Test 208 public void testLargeEditsPutDelete() throws Exception { 209 List<WALEntry> entries = new ArrayList<>(); 210 List<ExtendedCell> cells = new ArrayList<>(); 211 for (int i = 0; i < 5510; i++) { 212 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); 213 } 214 SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells), 215 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 216 217 ResultScanner resultScanner = table1.getScanner(new Scan()); 218 int totalRows = 0; 219 while (resultScanner.next() != null) { 220 totalRows++; 221 } 222 assertEquals(5510, totalRows); 223 224 entries = new ArrayList<>(); 225 cells = new ArrayList<>(); 226 for (int i = 0; i < 11000; i++) { 227 entries.add(createEntry(TABLE_NAME1, i, 228 i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, cells)); 229 } 230 SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells), 231 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 232 resultScanner = table1.getScanner(new Scan()); 233 totalRows = 0; 234 while (resultScanner.next() != null) { 235 totalRows++; 236 } 237 assertEquals(5500, totalRows); 238 } 239 240 /** 241 * Insert to 2 different tables 242 */ 243 @Test 244 public void testMixedPutTables() throws Exception { 245 List<WALEntry> entries = new ArrayList<>(BATCH_SIZE / 2); 246 List<ExtendedCell> cells = new ArrayList<>(); 247 for (int i = 0; i < BATCH_SIZE; i++) { 248 entries.add(createEntry(i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1, i, KeyValue.Type.Put, cells)); 249 } 250 251 SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), 252 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 253 Scan scan = new Scan(); 254 ResultScanner scanRes = table2.getScanner(scan); 255 for (Result res : scanRes) { 256 assertEquals(0, Bytes.toInt(res.getRow()) % 2); 257 } 258 scanRes = table1.getScanner(scan); 259 for (Result res : scanRes) { 260 assertEquals(1, Bytes.toInt(res.getRow()) % 2); 261 } 262 } 263 264 /** 265 * Insert then do different types of deletes 266 */ 267 @Test 268 public void testMixedDeletes() throws Exception { 269 List<WALEntry> entries = new ArrayList<>(3); 270 List<ExtendedCell> cells = new ArrayList<>(); 271 for (int i = 0; i < 3; i++) { 272 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); 273 } 274 SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), 275 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 276 entries = new ArrayList<>(3); 277 cells = new ArrayList<>(); 278 entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells)); 279 entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells)); 280 entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells)); 281 282 SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), 283 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 284 285 Scan scan = new Scan(); 286 ResultScanner scanRes = table1.getScanner(scan); 287 assertEquals(0, scanRes.next(3).length); 288 } 289 290 /** 291 * Puts are buffered, but this tests when a delete (not-buffered) is applied before the actual Put 292 * that creates it. 293 */ 294 @Test 295 public void testApplyDeleteBeforePut() throws Exception { 296 List<WALEntry> entries = new ArrayList<>(5); 297 List<ExtendedCell> cells = new ArrayList<>(); 298 for (int i = 0; i < 2; i++) { 299 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); 300 } 301 entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells)); 302 for (int i = 3; i < 5; i++) { 303 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); 304 } 305 SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), 306 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 307 Get get = new Get(Bytes.toBytes(1)); 308 Result res = table1.get(get); 309 assertEquals(0, res.size()); 310 } 311 312 @Test 313 public void testRethrowRetriesExhaustedException() throws Exception { 314 TableName notExistTable = TableName.valueOf("notExistTable"); 315 List<WALEntry> entries = new ArrayList<>(); 316 List<ExtendedCell> cells = new ArrayList<>(); 317 for (int i = 0; i < 10; i++) { 318 entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells)); 319 } 320 try { 321 SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), 322 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 323 Assert.fail("Should re-throw TableNotFoundException."); 324 } catch (TableNotFoundException e) { 325 } 326 entries.clear(); 327 cells.clear(); 328 for (int i = 0; i < 10; i++) { 329 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); 330 } 331 try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { 332 try (Admin admin = conn.getAdmin()) { 333 admin.disableTable(TABLE_NAME1); 334 try { 335 SINK.replicateEntries(entries, 336 PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, 337 baseNamespaceDir, hfileArchiveDir); 338 Assert.fail("Should re-throw RetriesExhaustedWithDetailsException."); 339 } catch (RetriesExhaustedException e) { 340 } finally { 341 admin.enableTable(TABLE_NAME1); 342 } 343 } 344 } 345 } 346 347 /** 348 * Test replicateEntries with a bulk load entry for 25 HFiles 349 */ 350 @Test 351 public void testReplicateEntriesForHFiles() throws Exception { 352 Path dir = TEST_UTIL.getDataTestDirOnTestFS("testReplicateEntries"); 353 Path familyDir = new Path(dir, Bytes.toString(FAM_NAME1)); 354 int numRows = 10; 355 List<Path> p = new ArrayList<>(1); 356 final String hfilePrefix = "hfile-"; 357 358 // 1. Generate 25 hfile ranges 359 Random rand = ThreadLocalRandom.current(); 360 Set<Integer> numbers = new HashSet<>(); 361 while (numbers.size() < 50) { 362 numbers.add(rand.nextInt(1000)); 363 } 364 List<Integer> numberList = new ArrayList<>(numbers); 365 Collections.sort(numberList); 366 Map<String, Long> storeFilesSize = new HashMap<>(1); 367 368 // 2. Create 25 hfiles 369 Configuration conf = TEST_UTIL.getConfiguration(); 370 FileSystem fs = dir.getFileSystem(conf); 371 Iterator<Integer> numbersItr = numberList.iterator(); 372 for (int i = 0; i < 25; i++) { 373 Path hfilePath = new Path(familyDir, hfilePrefix + i); 374 HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1, 375 Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), numRows); 376 p.add(hfilePath); 377 storeFilesSize.put(hfilePath.getName(), fs.getFileStatus(hfilePath).getLen()); 378 } 379 380 // 3. Create a BulkLoadDescriptor and a WALEdit 381 Map<byte[], List<Path>> storeFiles = new HashMap<>(1); 382 storeFiles.put(FAM_NAME1, p); 383 org.apache.hadoop.hbase.wal.WALEdit edit = null; 384 WALProtos.BulkLoadDescriptor loadDescriptor = null; 385 386 try (Connection c = ConnectionFactory.createConnection(conf); 387 RegionLocator l = c.getRegionLocator(TABLE_NAME1)) { 388 RegionInfo regionInfo = l.getAllRegionLocations().get(0).getRegion(); 389 loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1, 390 UnsafeByteOperations.unsafeWrap(regionInfo.getEncodedNameAsBytes()), storeFiles, 391 storeFilesSize, 1); 392 edit = org.apache.hadoop.hbase.wal.WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor); 393 } 394 List<WALEntry> entries = new ArrayList<>(1); 395 396 // 4. Create a WALEntryBuilder 397 WALEntry.Builder builder = createWALEntryBuilder(TABLE_NAME1); 398 399 // 5. Copy the hfile to the path as it is in reality 400 for (int i = 0; i < 25; i++) { 401 String pathToHfileFromNS = new StringBuilder(100).append(TABLE_NAME1.getNamespaceAsString()) 402 .append(Path.SEPARATOR).append(Bytes.toString(TABLE_NAME1.getName())).append(Path.SEPARATOR) 403 .append(Bytes.toString(loadDescriptor.getEncodedRegionName().toByteArray())) 404 .append(Path.SEPARATOR).append(Bytes.toString(FAM_NAME1)).append(Path.SEPARATOR) 405 .append(hfilePrefix + i).toString(); 406 String dst = baseNamespaceDir + Path.SEPARATOR + pathToHfileFromNS; 407 Path dstPath = new Path(dst); 408 FileUtil.copy(fs, p.get(0), fs, dstPath, false, conf); 409 } 410 411 entries.add(builder.build()); 412 try (ResultScanner scanner = table1.getScanner(new Scan())) { 413 // 6. Assert no existing data in table 414 assertEquals(0, scanner.next(numRows).length); 415 } 416 // 7. Replicate the bulk loaded entry 417 SINK.replicateEntries(entries, 418 PrivateCellUtil 419 .createExtendedCellScanner(WALEditInternalHelper.getExtendedCells(edit).iterator()), 420 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 421 try (ResultScanner scanner = table1.getScanner(new Scan())) { 422 // 8. Assert data is replicated 423 assertEquals(numRows, scanner.next(numRows).length); 424 } 425 // Clean up the created hfiles or it will mess up subsequent tests 426 } 427 428 /** 429 * Test failure metrics produced for failed replication edits 430 */ 431 @Test 432 public void testFailedReplicationSinkMetrics() throws IOException { 433 long initialFailedBatches = SINK.getSinkMetrics().getFailedBatches(); 434 long errorCount = 0L; 435 List<WALEntry> entries = new ArrayList<>(BATCH_SIZE); 436 List<ExtendedCell> cells = new ArrayList<>(); 437 for (int i = 0; i < BATCH_SIZE; i++) { 438 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); 439 } 440 cells.clear(); // cause IndexOutOfBoundsException 441 try { 442 SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), 443 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 444 Assert.fail("Should re-throw ArrayIndexOutOfBoundsException."); 445 } catch (ArrayIndexOutOfBoundsException e) { 446 errorCount++; 447 assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches()); 448 } 449 450 entries.clear(); 451 cells.clear(); 452 TableName notExistTable = TableName.valueOf("notExistTable"); // cause TableNotFoundException 453 for (int i = 0; i < BATCH_SIZE; i++) { 454 entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells)); 455 } 456 try { 457 SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), 458 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 459 Assert.fail("Should re-throw TableNotFoundException."); 460 } catch (TableNotFoundException e) { 461 errorCount++; 462 assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches()); 463 } 464 465 entries.clear(); 466 cells.clear(); 467 for (int i = 0; i < BATCH_SIZE; i++) { 468 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); 469 } 470 // cause IOException in batch() 471 try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { 472 try (Admin admin = conn.getAdmin()) { 473 admin.disableTable(TABLE_NAME1); 474 try { 475 SINK.replicateEntries(entries, 476 PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, 477 baseNamespaceDir, hfileArchiveDir); 478 Assert.fail("Should re-throw IOException."); 479 } catch (IOException e) { 480 errorCount++; 481 assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches()); 482 } finally { 483 admin.enableTable(TABLE_NAME1); 484 } 485 } 486 } 487 } 488 489 private WALEntry createEntry(TableName table, int row, KeyValue.Type type, 490 List<ExtendedCell> cells) { 491 byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2; 492 byte[] rowBytes = Bytes.toBytes(row); 493 // Just make sure we don't get the same ts for two consecutive rows with 494 // same key 495 try { 496 Thread.sleep(1); 497 } catch (InterruptedException e) { 498 LOG.info("Was interrupted while sleep, meh", e); 499 } 500 final long now = EnvironmentEdgeManager.currentTime(); 501 KeyValue kv = null; 502 if (type.getCode() == KeyValue.Type.Put.getCode()) { 503 kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.Put, Bytes.toBytes(row)); 504 } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) { 505 kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.DeleteColumn); 506 } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) { 507 kv = new KeyValue(rowBytes, fam, null, now, KeyValue.Type.DeleteFamily); 508 } 509 WALEntry.Builder builder = createWALEntryBuilder(table); 510 cells.add(kv); 511 512 return builder.build(); 513 } 514 515 public static WALEntry.Builder createWALEntryBuilder(TableName table) { 516 WALEntry.Builder builder = WALEntry.newBuilder(); 517 builder.setAssociatedCellCount(1); 518 WALKey.Builder keyBuilder = WALKey.newBuilder(); 519 UUID.Builder uuidBuilder = UUID.newBuilder(); 520 uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits()); 521 uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits()); 522 keyBuilder.setClusterId(uuidBuilder.build()); 523 keyBuilder.setTableName(UnsafeByteOperations.unsafeWrap(table.getName())); 524 keyBuilder.setWriteTime(EnvironmentEdgeManager.currentTime()); 525 keyBuilder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(HConstants.EMPTY_BYTE_ARRAY)); 526 keyBuilder.setLogSequenceNumber(-1); 527 builder.setKey(keyBuilder.build()); 528 return builder; 529 } 530}