001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.junit.Assert.assertEquals; 021 022import java.util.Arrays; 023import java.util.concurrent.CountDownLatch; 024import org.apache.hadoop.hbase.HBaseTestingUtil; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.client.Durability; 027import org.apache.hadoop.hbase.client.Get; 028import org.apache.hadoop.hbase.client.Put; 029import org.apache.hadoop.hbase.client.Result; 030import org.apache.hadoop.hbase.client.Table; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.junit.Test; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * Testcase for HBASE-22539 038 */ 039public abstract class WALCorruptionDueToDanglingByteBufferTestBase { 040 041 private static final Logger LOG = 042 LoggerFactory.getLogger(TestAsyncFSWALCorruptionDueToDanglingByteBuffer.class); 043 044 protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 045 046 protected static CountDownLatch ARRIVE; 047 048 protected static CountDownLatch RESUME; 049 050 protected static TableName TABLE_NAME = TableName.valueOf("Corruption"); 051 052 protected static byte[] CF = Bytes.toBytes("cf"); 053 054 protected static byte[] CQ = Bytes.toBytes("cq"); 055 056 private byte[] getBytes(String prefix, int index) { 057 return Bytes.toBytes(String.format("%s-%08d", prefix, index)); 058 } 059 060 @Test 061 public void test() throws Exception { 062 LOG.info("Stop WAL appending..."); 063 ARRIVE = new CountDownLatch(1); 064 RESUME = new CountDownLatch(1); 065 try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { 066 LOG.info("Put 100 rows with " + Durability.ASYNC_WAL + "..."); 067 for (int i = 0; i < 100; i++) { 068 table.batch(Arrays.asList(new Put(getBytes("row", i)) 069 .addColumn(CF, CQ, getBytes("value", i)).setDurability(Durability.ASYNC_WAL)), 070 new Object[1]); 071 } 072 ARRIVE.await(); 073 ARRIVE = null; 074 LOG.info("Resume WAL appending..."); 075 RESUME.countDown(); 076 LOG.info("Put a single row to force a WAL sync..."); 077 table.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("value"))); 078 LOG.info("Abort the only region server"); 079 UTIL.getMiniHBaseCluster().abortRegionServer(0); 080 LOG.info("Start a new region server"); 081 UTIL.getMiniHBaseCluster().startRegionServerAndWait(30000); 082 UTIL.waitTableAvailable(TABLE_NAME); 083 LOG.info("Check if all rows are still valid"); 084 for (int i = 0; i < 100; i++) { 085 Result result = table.get(new Get(getBytes("row", i))); 086 assertEquals(Bytes.toString(getBytes("value", i)), Bytes.toString(result.getValue(CF, CQ))); 087 } 088 Result result = table.get(new Get(Bytes.toBytes("row"))); 089 assertEquals("value", Bytes.toString(result.getValue(CF, CQ))); 090 } 091 } 092}