001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.junit.Assert.assertEquals; 021 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.CountDownLatch; 025import org.apache.hadoop.hbase.HBaseTestingUtility; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.client.Durability; 028import org.apache.hadoop.hbase.client.Get; 029import org.apache.hadoop.hbase.client.Put; 030import org.apache.hadoop.hbase.client.Result; 031import org.apache.hadoop.hbase.client.Table; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.junit.Test; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037public abstract class WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase { 038 039 private static final Logger LOG = 040 LoggerFactory.getLogger(WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase.class); 041 042 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 043 044 protected static CountDownLatch ARRIVE; 045 046 protected static CountDownLatch RESUME; 047 048 protected static TableName TABLE_NAME = 049 TableName.valueOf("WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase"); 050 051 protected static byte[] CF = Bytes.toBytes("cf"); 052 053 protected static byte[] CQ = Bytes.toBytes("cq"); 054 055 private byte[] getBytes(String prefix, int index) { 056 return Bytes.toBytes(String.format("%s-%08d", prefix, index)); 057 } 058 059 @Test 060 public void test() throws Exception { 061 LOG.info("Stop WAL appending..."); 062 ARRIVE = new CountDownLatch(1); 063 RESUME = new CountDownLatch(1); 064 try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { 065 LOG.info("Put totally 100 rows in batches of 5 with " + Durability.ASYNC_WAL + "..."); 066 int batchSize = 5; 067 List<Put> puts = new ArrayList<>(batchSize); 068 for (int i = 1; i <= 100; i++) { 069 Put p = new Put(getBytes("row", i)).addColumn(CF, CQ, getBytes("value", i)) 070 .setDurability(Durability.ASYNC_WAL); 071 puts.add(p); 072 if (i % batchSize == 0) { 073 table.put(puts); 074 LOG.info("Wrote batch of {} rows from row {}", batchSize, 075 Bytes.toString(puts.get(0).getRow())); 076 puts.clear(); 077 // Wait for few of the minibatches in 1st batch of puts to go through the WAL write. 078 // The WAL write will pause then 079 if (ARRIVE != null) { 080 ARRIVE.await(); 081 ARRIVE = null; 082 } 083 } 084 } 085 LOG.info("Resume WAL appending..."); 086 RESUME.countDown(); 087 LOG.info("Put a single row to force a WAL sync..."); 088 table.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("value"))); 089 LOG.info("Abort the only region server"); 090 UTIL.getMiniHBaseCluster().abortRegionServer(0); 091 LOG.info("Start a new region server"); 092 UTIL.getMiniHBaseCluster().startRegionServerAndWait(30000); 093 UTIL.waitTableAvailable(TABLE_NAME); 094 LOG.info("Check if all rows are still valid"); 095 for (int i = 1; i <= 100; i++) { 096 Result result = table.get(new Get(getBytes("row", i))); 097 assertEquals(Bytes.toString(getBytes("value", i)), Bytes.toString(result.getValue(CF, CQ))); 098 } 099 Result result = table.get(new Get(Bytes.toBytes("row"))); 100 assertEquals("value", Bytes.toString(result.getValue(CF, CQ))); 101 } 102 } 103}