001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.encoding; 019 020import static org.junit.Assert.assertTrue; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collections; 026import java.util.List; 027import java.util.Random; 028import java.util.concurrent.ThreadLocalRandom; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Durability; 041import org.apache.hadoop.hbase.client.Get; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 046import org.apache.hadoop.hbase.regionserver.HRegionServer; 047import org.apache.hadoop.hbase.testclassification.IOTests; 048import org.apache.hadoop.hbase.testclassification.LargeTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.Threads; 052import org.junit.AfterClass; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060/** 061 * Tests changing data block encoding settings of a column family. 062 */ 063@Category({ IOTests.class, LargeTests.class }) 064public class TestChangingEncoding { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestChangingEncoding.class); 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestChangingEncoding.class); 071 static final String CF = "EncodingTestCF"; 072 static final byte[] CF_BYTES = Bytes.toBytes(CF); 073 074 private static final int NUM_ROWS_PER_BATCH = 100; 075 private static final int NUM_COLS_PER_ROW = 20; 076 077 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 078 private static final Configuration conf = TEST_UTIL.getConfiguration(); 079 080 private static final int TIMEOUT_MS = 600000; 081 082 private ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder; 083 084 private TableName tableName; 085 private static final List<DataBlockEncoding> ENCODINGS_TO_ITERATE = createEncodingsToIterate(); 086 087 private static final List<DataBlockEncoding> createEncodingsToIterate() { 088 List<DataBlockEncoding> encodings = new ArrayList<>(Arrays.asList(DataBlockEncoding.values())); 089 encodings.add(DataBlockEncoding.NONE); 090 return Collections.unmodifiableList(encodings); 091 } 092 093 /** A zero-based index of the current batch of test data being written */ 094 private int numBatchesWritten; 095 096 private void prepareTest(String testId) throws IOException { 097 tableName = TableName.valueOf("test_table_" + testId); 098 TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName); 099 columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(CF)); 100 tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build()); 101 try (Admin admin = TEST_UTIL.getConnection().getAdmin()) { 102 admin.createTable(tableDescriptorBuilder.build()); 103 } 104 numBatchesWritten = 0; 105 } 106 107 @BeforeClass 108 public static void setUpBeforeClass() throws Exception { 109 // Use a small flush size to create more HFiles. 110 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); 111 // Disabling split to make sure split does not cause modify column to wait which timesout test 112 // sometime 113 conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, 114 "org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy"); 115 // ((Log4JLogger)RpcServerImplementation.LOG).getLogger().setLevel(Level.TRACE); 116 // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.TRACE); 117 TEST_UTIL.startMiniCluster(); 118 } 119 120 @AfterClass 121 public static void tearDownAfterClass() throws Exception { 122 TEST_UTIL.shutdownMiniCluster(); 123 } 124 125 private static byte[] getRowKey(int batchId, int i) { 126 return Bytes.toBytes("batch" + batchId + "_row" + i); 127 } 128 129 private static byte[] getQualifier(int j) { 130 return Bytes.toBytes("col" + j); 131 } 132 133 private static byte[] getValue(int batchId, int i, int j) { 134 return Bytes.toBytes("value_for_" + Bytes.toString(getRowKey(batchId, i)) + "_col" + j); 135 } 136 137 static void writeTestDataBatch(TableName tableName, int batchId) throws Exception { 138 LOG.debug("Writing test data batch " + batchId); 139 List<Put> puts = new ArrayList<>(); 140 for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) { 141 Put put = new Put(getRowKey(batchId, i)); 142 for (int j = 0; j < NUM_COLS_PER_ROW; ++j) { 143 put.addColumn(CF_BYTES, getQualifier(j), getValue(batchId, i, j)); 144 } 145 put.setDurability(Durability.SKIP_WAL); 146 puts.add(put); 147 } 148 try (Connection conn = ConnectionFactory.createConnection(conf); 149 Table table = conn.getTable(tableName)) { 150 table.put(puts); 151 } 152 } 153 154 static void verifyTestDataBatch(TableName tableName, int batchId) throws Exception { 155 LOG.debug("Verifying test data batch " + batchId); 156 Table table = TEST_UTIL.getConnection().getTable(tableName); 157 for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) { 158 Get get = new Get(getRowKey(batchId, i)); 159 Result result = table.get(get); 160 for (int j = 0; j < NUM_COLS_PER_ROW; ++j) { 161 Cell kv = result.getColumnLatestCell(CF_BYTES, getQualifier(j)); 162 if (kv == null) { 163 continue; 164 } 165 assertTrue(CellUtil.matchingValue(kv, getValue(batchId, i, j))); 166 } 167 } 168 table.close(); 169 } 170 171 private void writeSomeNewData() throws Exception { 172 writeTestDataBatch(tableName, numBatchesWritten); 173 ++numBatchesWritten; 174 } 175 176 private void verifyAllData() throws Exception { 177 for (int i = 0; i < numBatchesWritten; ++i) { 178 verifyTestDataBatch(tableName, i); 179 } 180 } 181 182 private void setEncodingConf(DataBlockEncoding encoding, boolean onlineChange) throws Exception { 183 LOG.debug("Setting CF encoding to " + encoding + " (ordinal=" + encoding.ordinal() 184 + "), onlineChange=" + onlineChange); 185 columnFamilyDescriptorBuilder.setDataBlockEncoding(encoding); 186 try (Admin admin = TEST_UTIL.getConnection().getAdmin()) { 187 if (!onlineChange) { 188 admin.disableTable(tableName); 189 } 190 admin.modifyColumnFamily(tableName, columnFamilyDescriptorBuilder.build()); 191 if (!onlineChange) { 192 admin.enableTable(tableName); 193 } 194 } 195 // This is a unit test, not integration test. So let's 196 // wait for regions out of transition. Otherwise, for online 197 // encoding change, verification phase may be flaky because 198 // regions could be still in transition. 199 TEST_UTIL.waitUntilNoRegionsInTransition(TIMEOUT_MS); 200 } 201 202 @Test 203 public void testChangingEncoding() throws Exception { 204 prepareTest("ChangingEncoding"); 205 for (boolean onlineChange : new boolean[] { false, true }) { 206 for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) { 207 setEncodingConf(encoding, onlineChange); 208 writeSomeNewData(); 209 verifyAllData(); 210 } 211 } 212 } 213 214 @Test 215 public void testChangingEncodingWithCompaction() throws Exception { 216 prepareTest("ChangingEncodingWithCompaction"); 217 for (boolean onlineChange : new boolean[] { false, true }) { 218 for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) { 219 setEncodingConf(encoding, onlineChange); 220 writeSomeNewData(); 221 verifyAllData(); 222 compactAndWait(); 223 verifyAllData(); 224 } 225 } 226 } 227 228 private void compactAndWait() throws IOException, InterruptedException { 229 LOG.debug("Compacting table " + tableName); 230 HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); 231 Admin admin = TEST_UTIL.getAdmin(); 232 admin.majorCompact(tableName); 233 234 // Waiting for the compaction to start, at least .5s. 235 final long maxWaitime = EnvironmentEdgeManager.currentTime() + 500; 236 boolean cont; 237 do { 238 cont = rs.getCompactSplitThread().getCompactionQueueSize() == 0; 239 Threads.sleep(1); 240 } while (cont && EnvironmentEdgeManager.currentTime() < maxWaitime); 241 242 while (rs.getCompactSplitThread().getCompactionQueueSize() > 0) { 243 Threads.sleep(1); 244 } 245 LOG.debug("Compaction queue size reached 0, continuing"); 246 } 247 248 @Test 249 public void testCrazyRandomChanges() throws Exception { 250 prepareTest("RandomChanges"); 251 Random rand = ThreadLocalRandom.current(); 252 for (int i = 0; i < 10; ++i) { 253 int encodingOrdinal = rand.nextInt(DataBlockEncoding.values().length); 254 DataBlockEncoding encoding = DataBlockEncoding.values()[encodingOrdinal]; 255 setEncodingConf(encoding, rand.nextBoolean()); 256 writeSomeNewData(); 257 verifyAllData(); 258 } 259 } 260}