001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Random;
031import java.util.Set;
032import java.util.concurrent.ThreadLocalRandom;
033import java.util.concurrent.atomic.AtomicBoolean;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.FileUtil;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.ExtendedCell;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.PrivateCellUtil;
044import org.apache.hadoop.hbase.Stoppable;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.TableNotFoundException;
047import org.apache.hadoop.hbase.client.Admin;
048import org.apache.hadoop.hbase.client.Connection;
049import org.apache.hadoop.hbase.client.ConnectionFactory;
050import org.apache.hadoop.hbase.client.Get;
051import org.apache.hadoop.hbase.client.RegionInfo;
052import org.apache.hadoop.hbase.client.RegionLocator;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.ResultScanner;
055import org.apache.hadoop.hbase.client.RetriesExhaustedException;
056import org.apache.hadoop.hbase.client.Scan;
057import org.apache.hadoop.hbase.client.Table;
058import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
059import org.apache.hadoop.hbase.testclassification.LargeTests;
060import org.apache.hadoop.hbase.testclassification.ReplicationTests;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.CommonFSUtils;
063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
064import org.apache.hadoop.hbase.util.HFileTestUtil;
065import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
066import org.junit.AfterClass;
067import org.junit.Assert;
068import org.junit.Before;
069import org.junit.BeforeClass;
070import org.junit.ClassRule;
071import org.junit.Test;
072import org.junit.experimental.categories.Category;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075
076import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
077
078import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
083
084@Category({ ReplicationTests.class, LargeTests.class })
085public class TestReplicationSink {
086
087  @ClassRule
088  public static final HBaseClassTestRule CLASS_RULE =
089    HBaseClassTestRule.forClass(TestReplicationSink.class);
090
091  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSink.class);
092  private static final int BATCH_SIZE = 10;
093
094  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
095
096  protected static ReplicationSink SINK;
097
098  protected static final TableName TABLE_NAME1 = TableName.valueOf("table1");
099  protected static final TableName TABLE_NAME2 = TableName.valueOf("table2");
100
101  protected static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
102  protected static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
103
104  protected static Table table1;
105  protected static Stoppable STOPPABLE = new Stoppable() {
106    final AtomicBoolean stop = new AtomicBoolean(false);
107
108    @Override
109    public boolean isStopped() {
110      return this.stop.get();
111    }
112
113    @Override
114    public void stop(String why) {
115      LOG.info("STOPPING BECAUSE: " + why);
116      this.stop.set(true);
117    }
118
119  };
120
121  protected static Table table2;
122  protected static String baseNamespaceDir;
123  protected static String hfileArchiveDir;
124  protected static String replicationClusterId;
125
126  /**
127   * @throws java.lang.Exception
128   */
129  @BeforeClass
130  public static void setUpBeforeClass() throws Exception {
131    TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
132      TestSourceFSConfigurationProvider.class.getCanonicalName());
133    TEST_UTIL.startMiniCluster(3);
134    RegionServerCoprocessorHost rsCpHost =
135      TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getRegionServerCoprocessorHost();
136    SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), rsCpHost);
137    table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
138    table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
139    Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
140    baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR)).toString();
141    hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY)).toString();
142    replicationClusterId = "12345";
143  }
144
145  /**
146   * @throws java.lang.Exception
147   */
148  @AfterClass
149  public static void tearDownAfterClass() throws Exception {
150    STOPPABLE.stop("Shutting down");
151    TEST_UTIL.shutdownMiniCluster();
152  }
153
154  /**
155   * @throws java.lang.Exception
156   */
157  @Before
158  public void setUp() throws Exception {
159    table1 = TEST_UTIL.deleteTableData(TABLE_NAME1);
160    table2 = TEST_UTIL.deleteTableData(TABLE_NAME2);
161  }
162
163  /**
164   * Insert a whole batch of entries
165   */
166  @Test
167  public void testBatchSink() throws Exception {
168    List<WALEntry> entries = new ArrayList<>(BATCH_SIZE);
169    List<ExtendedCell> cells = new ArrayList<>();
170    for (int i = 0; i < BATCH_SIZE; i++) {
171      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
172    }
173    SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()),
174      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
175    Scan scan = new Scan();
176    ResultScanner scanRes = table1.getScanner(scan);
177    assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
178  }
179
180  /**
181   * Insert a mix of puts and deletes
182   */
183  @Test
184  public void testMixedPutDelete() throws Exception {
185    List<WALEntry> entries = new ArrayList<>(BATCH_SIZE / 2);
186    List<ExtendedCell> cells = new ArrayList<>();
187    for (int i = 0; i < BATCH_SIZE / 2; i++) {
188      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
189    }
190    SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells),
191      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
192
193    entries = new ArrayList<>(BATCH_SIZE);
194    cells = new ArrayList<>();
195    for (int i = 0; i < BATCH_SIZE; i++) {
196      entries.add(createEntry(TABLE_NAME1, i,
197        i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, cells));
198    }
199
200    SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()),
201      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
202    Scan scan = new Scan();
203    ResultScanner scanRes = table1.getScanner(scan);
204    assertEquals(BATCH_SIZE / 2, scanRes.next(BATCH_SIZE).length);
205  }
206
207  @Test
208  public void testLargeEditsPutDelete() throws Exception {
209    List<WALEntry> entries = new ArrayList<>();
210    List<ExtendedCell> cells = new ArrayList<>();
211    for (int i = 0; i < 5510; i++) {
212      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
213    }
214    SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells),
215      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
216
217    ResultScanner resultScanner = table1.getScanner(new Scan());
218    int totalRows = 0;
219    while (resultScanner.next() != null) {
220      totalRows++;
221    }
222    assertEquals(5510, totalRows);
223
224    entries = new ArrayList<>();
225    cells = new ArrayList<>();
226    for (int i = 0; i < 11000; i++) {
227      entries.add(createEntry(TABLE_NAME1, i,
228        i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, cells));
229    }
230    SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells),
231      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
232    resultScanner = table1.getScanner(new Scan());
233    totalRows = 0;
234    while (resultScanner.next() != null) {
235      totalRows++;
236    }
237    assertEquals(5500, totalRows);
238  }
239
240  /**
241   * Insert to 2 different tables
242   */
243  @Test
244  public void testMixedPutTables() throws Exception {
245    List<WALEntry> entries = new ArrayList<>(BATCH_SIZE / 2);
246    List<ExtendedCell> cells = new ArrayList<>();
247    for (int i = 0; i < BATCH_SIZE; i++) {
248      entries.add(createEntry(i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1, i, KeyValue.Type.Put, cells));
249    }
250
251    SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()),
252      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
253    Scan scan = new Scan();
254    ResultScanner scanRes = table2.getScanner(scan);
255    for (Result res : scanRes) {
256      assertEquals(0, Bytes.toInt(res.getRow()) % 2);
257    }
258    scanRes = table1.getScanner(scan);
259    for (Result res : scanRes) {
260      assertEquals(1, Bytes.toInt(res.getRow()) % 2);
261    }
262  }
263
264  /**
265   * Insert then do different types of deletes
266   */
267  @Test
268  public void testMixedDeletes() throws Exception {
269    List<WALEntry> entries = new ArrayList<>(3);
270    List<ExtendedCell> cells = new ArrayList<>();
271    for (int i = 0; i < 3; i++) {
272      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
273    }
274    SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()),
275      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
276    entries = new ArrayList<>(3);
277    cells = new ArrayList<>();
278    entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells));
279    entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
280    entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells));
281
282    SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()),
283      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
284
285    Scan scan = new Scan();
286    ResultScanner scanRes = table1.getScanner(scan);
287    assertEquals(0, scanRes.next(3).length);
288  }
289
290  /**
291   * Puts are buffered, but this tests when a delete (not-buffered) is applied before the actual Put
292   * that creates it.
293   */
294  @Test
295  public void testApplyDeleteBeforePut() throws Exception {
296    List<WALEntry> entries = new ArrayList<>(5);
297    List<ExtendedCell> cells = new ArrayList<>();
298    for (int i = 0; i < 2; i++) {
299      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
300    }
301    entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
302    for (int i = 3; i < 5; i++) {
303      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
304    }
305    SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()),
306      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
307    Get get = new Get(Bytes.toBytes(1));
308    Result res = table1.get(get);
309    assertEquals(0, res.size());
310  }
311
312  @Test
313  public void testRethrowRetriesExhaustedException() throws Exception {
314    TableName notExistTable = TableName.valueOf("notExistTable");
315    List<WALEntry> entries = new ArrayList<>();
316    List<ExtendedCell> cells = new ArrayList<>();
317    for (int i = 0; i < 10; i++) {
318      entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells));
319    }
320    try {
321      SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()),
322        replicationClusterId, baseNamespaceDir, hfileArchiveDir);
323      Assert.fail("Should re-throw TableNotFoundException.");
324    } catch (TableNotFoundException e) {
325    }
326    entries.clear();
327    cells.clear();
328    for (int i = 0; i < 10; i++) {
329      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
330    }
331    try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
332      try (Admin admin = conn.getAdmin()) {
333        admin.disableTable(TABLE_NAME1);
334        try {
335          SINK.replicateEntries(entries,
336            PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId,
337            baseNamespaceDir, hfileArchiveDir);
338          Assert.fail("Should re-throw RetriesExhaustedWithDetailsException.");
339        } catch (RetriesExhaustedException e) {
340        } finally {
341          admin.enableTable(TABLE_NAME1);
342        }
343      }
344    }
345  }
346
347  /**
348   * Test replicateEntries with a bulk load entry for 25 HFiles
349   */
350  @Test
351  public void testReplicateEntriesForHFiles() throws Exception {
352    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testReplicateEntries");
353    Path familyDir = new Path(dir, Bytes.toString(FAM_NAME1));
354    int numRows = 10;
355    List<Path> p = new ArrayList<>(1);
356    final String hfilePrefix = "hfile-";
357
358    // 1. Generate 25 hfile ranges
359    Random rand = ThreadLocalRandom.current();
360    Set<Integer> numbers = new HashSet<>();
361    while (numbers.size() < 50) {
362      numbers.add(rand.nextInt(1000));
363    }
364    List<Integer> numberList = new ArrayList<>(numbers);
365    Collections.sort(numberList);
366    Map<String, Long> storeFilesSize = new HashMap<>(1);
367
368    // 2. Create 25 hfiles
369    Configuration conf = TEST_UTIL.getConfiguration();
370    FileSystem fs = dir.getFileSystem(conf);
371    Iterator<Integer> numbersItr = numberList.iterator();
372    for (int i = 0; i < 25; i++) {
373      Path hfilePath = new Path(familyDir, hfilePrefix + i);
374      HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1,
375        Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), numRows);
376      p.add(hfilePath);
377      storeFilesSize.put(hfilePath.getName(), fs.getFileStatus(hfilePath).getLen());
378    }
379
380    // 3. Create a BulkLoadDescriptor and a WALEdit
381    Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
382    storeFiles.put(FAM_NAME1, p);
383    org.apache.hadoop.hbase.wal.WALEdit edit = null;
384    WALProtos.BulkLoadDescriptor loadDescriptor = null;
385
386    try (Connection c = ConnectionFactory.createConnection(conf);
387      RegionLocator l = c.getRegionLocator(TABLE_NAME1)) {
388      RegionInfo regionInfo = l.getAllRegionLocations().get(0).getRegion();
389      loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1,
390        UnsafeByteOperations.unsafeWrap(regionInfo.getEncodedNameAsBytes()), storeFiles,
391        storeFilesSize, 1);
392      edit = org.apache.hadoop.hbase.wal.WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor);
393    }
394    List<WALEntry> entries = new ArrayList<>(1);
395
396    // 4. Create a WALEntryBuilder
397    WALEntry.Builder builder = createWALEntryBuilder(TABLE_NAME1);
398
399    // 5. Copy the hfile to the path as it is in reality
400    for (int i = 0; i < 25; i++) {
401      String pathToHfileFromNS = new StringBuilder(100).append(TABLE_NAME1.getNamespaceAsString())
402        .append(Path.SEPARATOR).append(Bytes.toString(TABLE_NAME1.getName())).append(Path.SEPARATOR)
403        .append(Bytes.toString(loadDescriptor.getEncodedRegionName().toByteArray()))
404        .append(Path.SEPARATOR).append(Bytes.toString(FAM_NAME1)).append(Path.SEPARATOR)
405        .append(hfilePrefix + i).toString();
406      String dst = baseNamespaceDir + Path.SEPARATOR + pathToHfileFromNS;
407      Path dstPath = new Path(dst);
408      FileUtil.copy(fs, p.get(0), fs, dstPath, false, conf);
409    }
410
411    entries.add(builder.build());
412    try (ResultScanner scanner = table1.getScanner(new Scan())) {
413      // 6. Assert no existing data in table
414      assertEquals(0, scanner.next(numRows).length);
415    }
416    // 7. Replicate the bulk loaded entry
417    SINK.replicateEntries(entries,
418      PrivateCellUtil
419        .createExtendedCellScanner(WALEditInternalHelper.getExtendedCells(edit).iterator()),
420      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
421    try (ResultScanner scanner = table1.getScanner(new Scan())) {
422      // 8. Assert data is replicated
423      assertEquals(numRows, scanner.next(numRows).length);
424    }
425    // Clean up the created hfiles or it will mess up subsequent tests
426  }
427
428  /**
429   * Test failure metrics produced for failed replication edits
430   */
431  @Test
432  public void testFailedReplicationSinkMetrics() throws IOException {
433    long initialFailedBatches = SINK.getSinkMetrics().getFailedBatches();
434    long errorCount = 0L;
435    List<WALEntry> entries = new ArrayList<>(BATCH_SIZE);
436    List<ExtendedCell> cells = new ArrayList<>();
437    for (int i = 0; i < BATCH_SIZE; i++) {
438      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
439    }
440    cells.clear(); // cause IndexOutOfBoundsException
441    try {
442      SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()),
443        replicationClusterId, baseNamespaceDir, hfileArchiveDir);
444      Assert.fail("Should re-throw ArrayIndexOutOfBoundsException.");
445    } catch (ArrayIndexOutOfBoundsException e) {
446      errorCount++;
447      assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
448    }
449
450    entries.clear();
451    cells.clear();
452    TableName notExistTable = TableName.valueOf("notExistTable"); // cause TableNotFoundException
453    for (int i = 0; i < BATCH_SIZE; i++) {
454      entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells));
455    }
456    try {
457      SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()),
458        replicationClusterId, baseNamespaceDir, hfileArchiveDir);
459      Assert.fail("Should re-throw TableNotFoundException.");
460    } catch (TableNotFoundException e) {
461      errorCount++;
462      assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
463    }
464
465    entries.clear();
466    cells.clear();
467    for (int i = 0; i < BATCH_SIZE; i++) {
468      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
469    }
470    // cause IOException in batch()
471    try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
472      try (Admin admin = conn.getAdmin()) {
473        admin.disableTable(TABLE_NAME1);
474        try {
475          SINK.replicateEntries(entries,
476            PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId,
477            baseNamespaceDir, hfileArchiveDir);
478          Assert.fail("Should re-throw IOException.");
479        } catch (IOException e) {
480          errorCount++;
481          assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
482        } finally {
483          admin.enableTable(TABLE_NAME1);
484        }
485      }
486    }
487  }
488
489  private WALEntry createEntry(TableName table, int row, KeyValue.Type type,
490    List<ExtendedCell> cells) {
491    byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
492    byte[] rowBytes = Bytes.toBytes(row);
493    // Just make sure we don't get the same ts for two consecutive rows with
494    // same key
495    try {
496      Thread.sleep(1);
497    } catch (InterruptedException e) {
498      LOG.info("Was interrupted while sleep, meh", e);
499    }
500    final long now = EnvironmentEdgeManager.currentTime();
501    KeyValue kv = null;
502    if (type.getCode() == KeyValue.Type.Put.getCode()) {
503      kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.Put, Bytes.toBytes(row));
504    } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
505      kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.DeleteColumn);
506    } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
507      kv = new KeyValue(rowBytes, fam, null, now, KeyValue.Type.DeleteFamily);
508    }
509    WALEntry.Builder builder = createWALEntryBuilder(table);
510    cells.add(kv);
511
512    return builder.build();
513  }
514
515  public static WALEntry.Builder createWALEntryBuilder(TableName table) {
516    WALEntry.Builder builder = WALEntry.newBuilder();
517    builder.setAssociatedCellCount(1);
518    WALKey.Builder keyBuilder = WALKey.newBuilder();
519    UUID.Builder uuidBuilder = UUID.newBuilder();
520    uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits());
521    uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits());
522    keyBuilder.setClusterId(uuidBuilder.build());
523    keyBuilder.setTableName(UnsafeByteOperations.unsafeWrap(table.getName()));
524    keyBuilder.setWriteTime(EnvironmentEdgeManager.currentTime());
525    keyBuilder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(HConstants.EMPTY_BYTE_ARRAY));
526    keyBuilder.setLogSequenceNumber(-1);
527    builder.setKey(keyBuilder.build());
528    return builder;
529  }
530}