001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.HasNext;
028import org.junit.Before;
029import org.junit.Test;
030import org.junit.runners.Parameterized.Parameter;
031import org.junit.runners.Parameterized.Parameters;
032
033/**
034 * Try out different combinations of row count and KeyValue count
035 */
036public abstract class TestWALEntryStreamDifferentCounts extends WALEntryStreamTestBase {
037
038  @Parameter(0)
039  public int nbRows;
040
041  @Parameter(1)
042  public int walEditKVs;
043
044  @Parameter(2)
045  public boolean isCompressionEnabled;
046
047  @Parameters(name = "{index}: nbRows={0}, walEditKVs={1}, isCompressionEnabled={2}")
048  public static Iterable<Object[]> data() {
049    List<Object[]> params = new ArrayList<>();
050    for (int nbRows : new int[] { 1500, 60000 }) {
051      for (int walEditKVs : new int[] { 1, 100 }) {
052        for (boolean isCompressionEnabled : new boolean[] { false, true }) {
053          params.add(new Object[] { nbRows, walEditKVs, isCompressionEnabled });
054        }
055      }
056    }
057    return params;
058  }
059
060  @Before
061  public void setUp() throws IOException {
062    CONF.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, isCompressionEnabled);
063    initWAL();
064  }
065
066  @Test
067  public void testDifferentCounts() throws Exception {
068    mvcc.advanceTo(1);
069
070    for (int i = 0; i < nbRows; i++) {
071      appendToLogAndSync(walEditKVs);
072    }
073
074    log.rollWriter();
075
076    try (WALEntryStream entryStream =
077      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
078      int i = 0;
079      while (entryStream.hasNext() == HasNext.YES) {
080        assertNotNull(entryStream.next());
081        i++;
082      }
083      assertEquals(nbRows, i);
084
085      // should've read all entries, and since the last file is still opened for writing so we will
086      // get a RETRY instead of NO here
087      assertEquals(HasNext.RETRY, entryStream.hasNext());
088    }
089  }
090}