001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor.example.row.stats;
019
020import static org.apache.hadoop.hbase.util.TestRegionSplitCalculator.TEST_UTIL;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.Optional;
027import java.util.concurrent.ThreadLocalRandom;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellScanner;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.client.ConnectionFactory;
035import org.apache.hadoop.hbase.client.Delete;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder.RowStatisticsRecorder;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.junit.AfterClass;
041import org.junit.Before;
042import org.junit.BeforeClass;
043import org.junit.Test;
044
045public class TestRowStatisticsCompactionObserver {
046
047  public static final TestableRowStatisticsRecorder RECORDER = new TestableRowStatisticsRecorder();
048  private static final TableName TABLE_NAME = TableName.valueOf("test-table");
049  private static final byte[] FAMILY = Bytes.toBytes("0");
050  private static SingleProcessHBaseCluster cluster;
051  private static Connection connection;
052  private static Table table;
053
054  @BeforeClass
055  public static void setUpClass() throws Exception {
056    cluster = TEST_UTIL.startMiniCluster(1);
057    connection = ConnectionFactory.createConnection(cluster.getConf());
058    table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY }, 1,
059      HConstants.DEFAULT_BLOCKSIZE, TestableRowStatisticsCompactionObserver.class.getName());
060  }
061
062  @Before
063  public void setUp() throws Exception {
064    RECORDER.clear();
065  }
066
067  @AfterClass
068  public static void afterClass() throws Exception {
069    cluster.close();
070    TEST_UTIL.shutdownMiniCluster();
071    table.close();
072    connection.close();
073  }
074
075  @Test
076  public void itRecordsStats() throws IOException, InterruptedException {
077    int numRows = 10;
078    int largestRowNum = -1;
079    int largestRowSize = 0;
080
081    int largestCellRowNum = -1;
082    int largestCellColNum = -1;
083    long largestCellSize = 0;
084
085    for (int i = 0; i < numRows; i++) {
086      int cells = ThreadLocalRandom.current().nextInt(1000) + 10;
087
088      Put p = new Put(Bytes.toBytes(i));
089      for (int j = 0; j < cells; j++) {
090        byte[] val = new byte[ThreadLocalRandom.current().nextInt(100) + 1];
091        p.addColumn(FAMILY, Bytes.toBytes(j), val);
092      }
093
094      int rowSize = 0;
095      CellScanner cellScanner = p.cellScanner();
096      int j = 0;
097      while (cellScanner.advance()) {
098        Cell current = cellScanner.current();
099        int serializedSize = current.getSerializedSize();
100        if (serializedSize > largestCellSize) {
101          largestCellSize = serializedSize;
102          largestCellRowNum = i;
103          largestCellColNum = j;
104        }
105        rowSize += serializedSize;
106        j++;
107      }
108
109      if (rowSize > largestRowSize) {
110        largestRowNum = i;
111        largestRowSize = rowSize;
112      }
113
114      table.put(p);
115      connection.getAdmin().flush(table.getName());
116    }
117
118    for (int i = 0; i < numRows; i++) {
119      Delete d = new Delete(Bytes.toBytes(i));
120      d.addColumn(FAMILY, Bytes.toBytes(0));
121      table.delete(d);
122    }
123
124    System.out.println("Final flush");
125    connection.getAdmin().flush(table.getName());
126    Thread.sleep(5000);
127    System.out.println("Compacting");
128
129    RowStatisticsImpl lastStats = RECORDER.getLastStats(); // Just initialize
130    Boolean lastIsMajor = RECORDER.getLastIsMajor();
131    connection.getAdmin().compact(table.getName());
132    while (lastStats == null) {
133      Thread.sleep(1000);
134
135      System.out.println("Checking stats");
136      lastStats = RECORDER.getLastStats();
137      lastIsMajor = RECORDER.getLastIsMajor();
138    }
139    assertFalse(lastIsMajor);
140    assertEquals(lastStats.getTotalDeletesCount(), 10);
141    assertEquals(lastStats.getTotalRowsCount(), 10);
142
143    RECORDER.clear();
144    lastStats = RECORDER.getLastStats();
145    lastIsMajor = RECORDER.getLastIsMajor();
146    connection.getAdmin().majorCompact(table.getName());
147
148    // Must wait for async majorCompaction to complete
149    while (lastStats == null) {
150      Thread.sleep(1000);
151
152      System.out.println("Checking stats");
153      lastStats = RECORDER.getLastStats();
154      lastIsMajor = RECORDER.getLastIsMajor();
155    }
156    assertTrue(lastIsMajor);
157    // no deletes after major compact
158    assertEquals(lastStats.getTotalDeletesCount(), 0);
159    assertEquals(lastStats.getTotalRowsCount(), 10);
160    // can only check largest values after major compact, since the above minor compact might not
161    // contain all storefiles
162    assertEquals(Bytes.toInt(lastStats.getLargestRow()), largestRowNum);
163    assertEquals(
164      Bytes.toInt(lastStats.getLargestCell().getRowArray(),
165        lastStats.getLargestCell().getRowOffset(), lastStats.getLargestCell().getRowLength()),
166      largestCellRowNum);
167    assertEquals(Bytes.toInt(lastStats.getLargestCell().getQualifierArray(),
168      lastStats.getLargestCell().getQualifierOffset(),
169      lastStats.getLargestCell().getQualifierLength()), largestCellColNum);
170  }
171
172  public static class TestableRowStatisticsCompactionObserver
173    extends RowStatisticsCompactionObserver {
174
175    public TestableRowStatisticsCompactionObserver() {
176      super(TestRowStatisticsCompactionObserver.RECORDER);
177    }
178  }
179
180  public static class TestableRowStatisticsRecorder implements RowStatisticsRecorder {
181
182    private volatile RowStatisticsImpl lastStats = null;
183    private volatile Boolean lastIsMajor = null;
184
185    @Override
186    public void record(RowStatisticsImpl stats, Optional<byte[]> fullRegionName) {
187      System.out.println("Record called with isMajor=" + stats.isMajor() + ", stats=" + stats
188        + ", fullRegionName=" + fullRegionName);
189      lastStats = stats;
190    }
191
192    public void clear() {
193      lastStats = null;
194      lastIsMajor = null;
195    }
196
197    public RowStatisticsImpl getLastStats() {
198      return lastStats;
199    }
200
201    public Boolean getLastIsMajor() {
202      return lastIsMajor;
203    }
204  }
205}