001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024import org.apache.hadoop.hbase.HBaseClassTestRule; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.hadoop.hbase.client.Get; 027import org.apache.hadoop.hbase.client.Put; 028import org.apache.hadoop.hbase.client.Result; 029import org.apache.hadoop.hbase.replication.TestReplicationBase; 030import org.apache.hadoop.hbase.testclassification.MediumTests; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.junit.AfterClass; 033import org.junit.BeforeClass; 034import org.junit.ClassRule; 035import org.junit.Test; 036import org.junit.experimental.categories.Category; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040@Category(MediumTests.class) 041public class TestReplicationCompressedWAL extends TestReplicationBase { 042 043 @ClassRule 044 public static final HBaseClassTestRule CLASS_RULE = 045 HBaseClassTestRule.forClass(TestReplicationCompressedWAL.class); 046 047 static final Logger LOG = LoggerFactory.getLogger(TestReplicationCompressedWAL.class); 048 static final int NUM_BATCHES = 20; 049 static final int NUM_ROWS_PER_BATCH = 100; 050 051 @BeforeClass 052 public static void setUpBeforeClass() throws Exception { 053 CONF1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); 054 TestReplicationBase.setUpBeforeClass(); 055 } 056 057 @AfterClass 058 public static void tearDownAfterClass() throws Exception { 059 TestReplicationBase.tearDownAfterClass(); 060 } 061 062 @Test 063 public void testMultiplePuts() throws Exception { 064 runMultiplePutTest(); 065 } 066 067 protected static void runMultiplePutTest() throws IOException, InterruptedException { 068 for (int i = 0; i < NUM_BATCHES; i++) { 069 putBatch(i); 070 getBatch(i); 071 } 072 } 073 074 protected static void getBatch(int batch) throws IOException, InterruptedException { 075 for (int i = 0; i < NUM_ROWS_PER_BATCH; i++) { 076 byte[] row = getRowKey(batch, i); 077 Get get = new Get(row); 078 for (int j = 0; j < NB_RETRIES; j++) { 079 if (j == NB_RETRIES - 1) { 080 fail("Waited too much time for replication"); 081 } 082 Result res = htable2.get(get); 083 if (res.isEmpty()) { 084 LOG.info("Row not available"); 085 Thread.sleep(SLEEP_TIME); 086 } else { 087 assertArrayEquals(row, res.value()); 088 break; 089 } 090 } 091 } 092 } 093 094 protected static byte[] getRowKey(int batch, int count) { 095 return Bytes.toBytes("row" + ((batch * NUM_ROWS_PER_BATCH) + count)); 096 } 097 098 protected static void putBatch(int batch) throws IOException { 099 for (int i = 0; i < NUM_ROWS_PER_BATCH; i++) { 100 byte[] row = getRowKey(batch, i); 101 Put put = new Put(row); 102 put.addColumn(famName, row, row); 103 htable1.put(put); 104 } 105 } 106 107}