001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor.example.row.stats; 019 020import static org.apache.hadoop.hbase.util.TestRegionSplitCalculator.TEST_UTIL; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.Optional; 027import java.util.concurrent.ThreadLocalRandom; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellScanner; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Connection; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.Delete; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder.RowStatisticsRecorder; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.junit.AfterClass; 041import org.junit.Before; 042import org.junit.BeforeClass; 043import org.junit.Test; 044 045public class TestRowStatisticsCompactionObserver { 046 047 public static final TestableRowStatisticsRecorder RECORDER = new TestableRowStatisticsRecorder(); 048 private static final TableName TABLE_NAME = TableName.valueOf("test-table"); 049 private static final byte[] FAMILY = Bytes.toBytes("0"); 050 private static SingleProcessHBaseCluster cluster; 051 private static Connection connection; 052 private static Table table; 053 054 @BeforeClass 055 public static void setUpClass() throws Exception { 056 cluster = TEST_UTIL.startMiniCluster(1); 057 connection = ConnectionFactory.createConnection(cluster.getConf()); 058 table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY }, 1, 059 HConstants.DEFAULT_BLOCKSIZE, TestableRowStatisticsCompactionObserver.class.getName()); 060 } 061 062 @Before 063 public void setUp() throws Exception { 064 RECORDER.clear(); 065 } 066 067 @AfterClass 068 public static void afterClass() throws Exception { 069 cluster.close(); 070 TEST_UTIL.shutdownMiniCluster(); 071 table.close(); 072 connection.close(); 073 } 074 075 @Test 076 public void itRecordsStats() throws IOException, InterruptedException { 077 int numRows = 10; 078 int largestRowNum = -1; 079 int largestRowSize = 0; 080 081 int largestCellRowNum = -1; 082 int largestCellColNum = -1; 083 long largestCellSize = 0; 084 085 for (int i = 0; i < numRows; i++) { 086 int cells = ThreadLocalRandom.current().nextInt(1000) + 10; 087 088 Put p = new Put(Bytes.toBytes(i)); 089 for (int j = 0; j < cells; j++) { 090 byte[] val = new byte[ThreadLocalRandom.current().nextInt(100) + 1]; 091 p.addColumn(FAMILY, Bytes.toBytes(j), val); 092 } 093 094 int rowSize = 0; 095 CellScanner cellScanner = p.cellScanner(); 096 int j = 0; 097 while (cellScanner.advance()) { 098 Cell current = cellScanner.current(); 099 int serializedSize = current.getSerializedSize(); 100 if (serializedSize > largestCellSize) { 101 largestCellSize = serializedSize; 102 largestCellRowNum = i; 103 largestCellColNum = j; 104 } 105 rowSize += serializedSize; 106 j++; 107 } 108 109 if (rowSize > largestRowSize) { 110 largestRowNum = i; 111 largestRowSize = rowSize; 112 } 113 114 table.put(p); 115 connection.getAdmin().flush(table.getName()); 116 } 117 118 for (int i = 0; i < numRows; i++) { 119 Delete d = new Delete(Bytes.toBytes(i)); 120 d.addColumn(FAMILY, Bytes.toBytes(0)); 121 table.delete(d); 122 } 123 124 System.out.println("Final flush"); 125 connection.getAdmin().flush(table.getName()); 126 Thread.sleep(5000); 127 System.out.println("Compacting"); 128 129 RowStatisticsImpl lastStats = RECORDER.getLastStats(); // Just initialize 130 Boolean lastIsMajor = RECORDER.getLastIsMajor(); 131 connection.getAdmin().compact(table.getName()); 132 while (lastStats == null) { 133 Thread.sleep(1000); 134 135 System.out.println("Checking stats"); 136 lastStats = RECORDER.getLastStats(); 137 lastIsMajor = RECORDER.getLastIsMajor(); 138 } 139 assertFalse(lastIsMajor); 140 assertEquals(lastStats.getTotalDeletesCount(), 10); 141 assertEquals(lastStats.getTotalRowsCount(), 10); 142 143 RECORDER.clear(); 144 lastStats = RECORDER.getLastStats(); 145 lastIsMajor = RECORDER.getLastIsMajor(); 146 connection.getAdmin().majorCompact(table.getName()); 147 148 // Must wait for async majorCompaction to complete 149 while (lastStats == null) { 150 Thread.sleep(1000); 151 152 System.out.println("Checking stats"); 153 lastStats = RECORDER.getLastStats(); 154 lastIsMajor = RECORDER.getLastIsMajor(); 155 } 156 assertTrue(lastIsMajor); 157 // no deletes after major compact 158 assertEquals(lastStats.getTotalDeletesCount(), 0); 159 assertEquals(lastStats.getTotalRowsCount(), 10); 160 // can only check largest values after major compact, since the above minor compact might not 161 // contain all storefiles 162 assertEquals(Bytes.toInt(lastStats.getLargestRow()), largestRowNum); 163 assertEquals( 164 Bytes.toInt(lastStats.getLargestCell().getRowArray(), 165 lastStats.getLargestCell().getRowOffset(), lastStats.getLargestCell().getRowLength()), 166 largestCellRowNum); 167 assertEquals(Bytes.toInt(lastStats.getLargestCell().getQualifierArray(), 168 lastStats.getLargestCell().getQualifierOffset(), 169 lastStats.getLargestCell().getQualifierLength()), largestCellColNum); 170 } 171 172 public static class TestableRowStatisticsCompactionObserver 173 extends RowStatisticsCompactionObserver { 174 175 public TestableRowStatisticsCompactionObserver() { 176 super(TestRowStatisticsCompactionObserver.RECORDER); 177 } 178 } 179 180 public static class TestableRowStatisticsRecorder implements RowStatisticsRecorder { 181 182 private volatile RowStatisticsImpl lastStats = null; 183 private volatile Boolean lastIsMajor = null; 184 185 @Override 186 public void record(RowStatisticsImpl stats, Optional<byte[]> fullRegionName) { 187 System.out.println("Record called with isMajor=" + stats.isMajor() + ", stats=" + stats 188 + ", fullRegionName=" + fullRegionName); 189 lastStats = stats; 190 } 191 192 public void clear() { 193 lastStats = null; 194 lastIsMajor = null; 195 } 196 197 public RowStatisticsImpl getLastStats() { 198 return lastStats; 199 } 200 201 public Boolean getLastIsMajor() { 202 return lastIsMajor; 203 } 204 } 205}