001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.mockito.Mockito.mock;
021
022import java.io.IOException;
023import java.util.NavigableMap;
024import java.util.TreeMap;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseTestingUtility;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.KeyValue;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.RegionInfo;
033import org.apache.hadoop.hbase.client.RegionInfoBuilder;
034import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
035import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.wal.WAL;
039import org.apache.hadoop.hbase.wal.WALEdit;
040import org.apache.hadoop.hbase.wal.WALFactory;
041import org.apache.hadoop.hbase.wal.WALKeyImpl;
042import org.apache.hadoop.hdfs.DistributedFileSystem;
043import org.apache.hadoop.hdfs.MiniDFSCluster;
044import org.junit.After;
045import org.junit.AfterClass;
046import org.junit.Rule;
047import org.junit.rules.TestName;
048
049import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
050
051/**
052 * Base class for WALEntryStream tests.
053 */
054public abstract class WALEntryStreamTestBase {
055
056  protected static final long TEST_TIMEOUT_MS = 5000;
057  protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();;
058  protected static Configuration CONF;
059  protected static DistributedFileSystem fs;
060  protected static MiniDFSCluster cluster;
061  protected static final TableName tableName = TableName.valueOf("tablename");
062  protected static final byte[] family = Bytes.toBytes("column");
063  protected static final byte[] qualifier = Bytes.toBytes("qualifier");
064  protected static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName)
065    .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build();
066  protected static final NavigableMap<byte[], Integer> scopes = getScopes();
067  protected final String fakeWalGroupId = "fake-wal-group-id";
068
069  /**
070   * Test helper that waits until a non-null entry is available in the stream next or times out. A
071   * {@link WALEntryStream} provides a streaming access to a queue of log files. Since the stream
072   * can be consumed as the file is being written, callers relying on {@link WALEntryStream#next()}
073   * may need to retry multiple times before an entry appended to the WAL is visible to the stream
074   * consumers. One such cause of delay is the close() of writer writing these log files. While the
075   * closure is in progress, the stream does not switch to the next log in the queue and next() may
076   * return null entries. This utility wraps these retries into a single next call and that makes
077   * the test code simpler.
078   */
079  protected static class WALEntryStreamWithRetries extends WALEntryStream {
080
081    private boolean retry = true;
082
083    public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, FileSystem fs,
084      Configuration conf, long startPosition, WALFileLengthProvider walFileLengthProvider,
085      MetricsSource metrics, String walGroupId) {
086      super(logQueue, fs, conf, startPosition, walFileLengthProvider, metrics, walGroupId);
087    }
088
089    public void enableRetry() {
090      retry = true;
091    }
092
093    public void disableRetry() {
094      retry = false;
095    }
096
097    @Override
098    public HasNext hasNext() {
099      // hasNext is idempotent, so we can call it again and do not need to store its return value
100      if (retry) {
101        TEST_UTIL.waitFor(TEST_TIMEOUT_MS, () -> super.hasNext() == HasNext.YES);
102      }
103      return super.hasNext();
104    }
105  }
106
107  private static NavigableMap<byte[], Integer> getScopes() {
108    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
109    scopes.put(family, 1);
110    return scopes;
111  }
112
113  class PathWatcher implements WALActionsListener {
114
115    Path currentPath;
116
117    @Override
118    public void preLogRoll(Path oldPath, Path newPath) {
119      logQueue.enqueueLog(newPath, fakeWalGroupId);
120      currentPath = newPath;
121    }
122  }
123
124  protected WAL log;
125  protected ReplicationSourceLogQueue logQueue;
126  protected PathWatcher pathWatcher;
127
128  @Rule
129  public TestName tn = new TestName();
130  protected final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
131
132  protected static void startCluster() throws Exception {
133    CONF = TEST_UTIL.getConfiguration();
134    CONF.setLong("replication.source.sleepforretries", 10);
135    TEST_UTIL.startMiniDFSCluster(3);
136
137    cluster = TEST_UTIL.getDFSCluster();
138    fs = cluster.getFileSystem();
139  }
140
141  @AfterClass
142  public static void tearDownAfterClass() throws Exception {
143    TEST_UTIL.shutdownMiniCluster();
144  }
145
146  protected void initWAL() throws IOException {
147    ReplicationSource source = mock(ReplicationSource.class);
148    MetricsSource metricsSource = new MetricsSource("2");
149    // Source with the same id is shared and carries values from the last run
150    metricsSource.clear();
151    logQueue = new ReplicationSourceLogQueue(CONF, metricsSource, source);
152    pathWatcher = new PathWatcher();
153    final WALFactory wals = new WALFactory(CONF, tn.getMethodName().replaceAll("[\\[:]", "_"));
154    wals.getWALProvider().addWALActionsListener(pathWatcher);
155    log = wals.getWAL(info);
156  }
157
158  @After
159  public void tearDown() throws Exception {
160    Closeables.close(log, true);
161  }
162
163  protected void appendToLogAndSync() throws IOException {
164    appendToLogAndSync(1);
165  }
166
167  protected void appendToLogAndSync(int count) throws IOException {
168    long txid = appendToLog(count);
169    log.sync(txid);
170  }
171
172  protected long appendToLog(int count) throws IOException {
173    return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
174      EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdits(count));
175  }
176
177  protected WALEdit getWALEdits(int count) {
178    WALEdit edit = new WALEdit();
179    for (int i = 0; i < count; i++) {
180      edit.add(new KeyValue(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), family, qualifier,
181        EnvironmentEdgeManager.currentTime(), qualifier));
182    }
183    return edit;
184  }
185}