001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertThrows; 025import static org.junit.Assert.assertTrue; 026 027import java.io.EOFException; 028import java.io.IOException; 029import java.util.stream.IntStream; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.KeyValue; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.client.RegionInfoBuilder; 037import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.wal.WAL; 040import org.apache.hadoop.hbase.wal.WALEdit; 041import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 042import org.apache.hadoop.hbase.wal.WALKeyImpl; 043import org.apache.hadoop.hbase.wal.WALProvider; 044 045/** 046 * Helper class for testing protobuf log. 047 */ 048public final class ProtobufLogTestHelper { 049 050 private ProtobufLogTestHelper() { 051 } 052 053 private static byte[] toValue(int prefix, int suffix) { 054 return Bytes.toBytes(prefix + "-" + suffix); 055 } 056 057 private static RegionInfo toRegionInfo(TableName tableName) { 058 return RegionInfoBuilder.newBuilder(tableName).setRegionId(1024).build(); 059 } 060 061 private static WAL.Entry generateEdit(int i, RegionInfo hri, TableName tableName, byte[] row, 062 int columnCount, long timestamp, MultiVersionConcurrencyControl mvcc) { 063 WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, i, timestamp, 064 HConstants.DEFAULT_CLUSTER_ID, mvcc); 065 WALEdit edit = new WALEdit(); 066 int prefix = i; 067 IntStream.range(0, columnCount).mapToObj(j -> toValue(prefix, j)) 068 .map(value -> new KeyValue(row, row, row, timestamp, value)) 069 .forEachOrdered(c -> WALEditInternalHelper.addExtendedCell(edit, c)); 070 return new WAL.Entry(key, edit); 071 } 072 073 public static void doWrite(WALProvider.Writer writer, boolean withTrailer, TableName tableName, 074 int columnCount, int recordCount, byte[] row, long timestamp) throws IOException { 075 RegionInfo hri = toRegionInfo(tableName); 076 for (int i = 0; i < recordCount; i++) { 077 writer.append(generateEdit(i, hri, tableName, row, columnCount, timestamp, null)); 078 } 079 writer.sync(false); 080 if (withTrailer) { 081 writer.close(); 082 } 083 } 084 085 public static void doWrite(WAL wal, RegionInfo hri, TableName tableName, int columnCount, 086 int recordCount, byte[] row, long timestamp, MultiVersionConcurrencyControl mvcc) 087 throws IOException { 088 for (int i = 0; i < recordCount; i++) { 089 WAL.Entry entry = generateEdit(i, hri, tableName, row, columnCount, timestamp, mvcc); 090 wal.appendData(hri, entry.getKey(), entry.getEdit()); 091 } 092 wal.sync(); 093 } 094 095 public static void doRead(ProtobufWALStreamReader reader, boolean withTrailer, RegionInfo hri, 096 TableName tableName, int columnCount, int recordCount, byte[] row, long timestamp) 097 throws IOException { 098 if (withTrailer) { 099 assertNotNull(reader.trailer); 100 } else { 101 assertNull(reader.trailer); 102 } 103 for (int i = 0; i < recordCount; ++i) { 104 WAL.Entry entry = reader.next(); 105 assertNotNull(entry); 106 assertEquals(columnCount, entry.getEdit().size()); 107 assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName()); 108 assertEquals(tableName, entry.getKey().getTableName()); 109 int idx = 0; 110 for (Cell val : entry.getEdit().getCells()) { 111 assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(), 112 val.getRowLength())); 113 assertArrayEquals(toValue(i, idx), CellUtil.cloneValue(val)); 114 idx++; 115 } 116 } 117 if (withTrailer) { 118 // we can finish normally 119 assertNull(reader.next()); 120 } else { 121 // we will get an EOF since there is no trailer 122 assertThrows(EOFException.class, () -> reader.next()); 123 } 124 } 125 126 public static void doRead(ProtobufWALStreamReader reader, boolean withTrailer, 127 TableName tableName, int columnCount, int recordCount, byte[] row, long timestamp) 128 throws IOException { 129 doRead(reader, withTrailer, toRegionInfo(tableName), tableName, columnCount, recordCount, row, 130 timestamp); 131 } 132}