001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.mockito.Mockito.mock; 021 022import java.io.IOException; 023import java.util.NavigableMap; 024import java.util.TreeMap; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.KeyValue; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.RegionInfo; 033import org.apache.hadoop.hbase.client.RegionInfoBuilder; 034import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 035import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.wal.WAL; 039import org.apache.hadoop.hbase.wal.WALEdit; 040import org.apache.hadoop.hbase.wal.WALFactory; 041import org.apache.hadoop.hbase.wal.WALKeyImpl; 042import org.apache.hadoop.hdfs.DistributedFileSystem; 043import org.apache.hadoop.hdfs.MiniDFSCluster; 044import org.junit.After; 045import org.junit.AfterClass; 046import org.junit.Rule; 047import org.junit.rules.TestName; 048 049import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 050 051/** 052 * Base class for WALEntryStream tests. 053 */ 054public abstract class WALEntryStreamTestBase { 055 056 protected static final long TEST_TIMEOUT_MS = 5000; 057 protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();; 058 protected static Configuration CONF; 059 protected static DistributedFileSystem fs; 060 protected static MiniDFSCluster cluster; 061 protected static final TableName tableName = TableName.valueOf("tablename"); 062 protected static final byte[] family = Bytes.toBytes("column"); 063 protected static final byte[] qualifier = Bytes.toBytes("qualifier"); 064 protected static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName) 065 .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build(); 066 protected static final NavigableMap<byte[], Integer> scopes = getScopes(); 067 protected final String fakeWalGroupId = "fake-wal-group-id"; 068 069 /** 070 * Test helper that waits until a non-null entry is available in the stream next or times out. A 071 * {@link WALEntryStream} provides a streaming access to a queue of log files. Since the stream 072 * can be consumed as the file is being written, callers relying on {@link WALEntryStream#next()} 073 * may need to retry multiple times before an entry appended to the WAL is visible to the stream 074 * consumers. One such cause of delay is the close() of writer writing these log files. While the 075 * closure is in progress, the stream does not switch to the next log in the queue and next() may 076 * return null entries. This utility wraps these retries into a single next call and that makes 077 * the test code simpler. 078 */ 079 protected static class WALEntryStreamWithRetries extends WALEntryStream { 080 081 private boolean retry = true; 082 083 public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, FileSystem fs, 084 Configuration conf, long startPosition, WALFileLengthProvider walFileLengthProvider, 085 MetricsSource metrics, String walGroupId) { 086 super(logQueue, fs, conf, startPosition, walFileLengthProvider, metrics, walGroupId); 087 } 088 089 public void enableRetry() { 090 retry = true; 091 } 092 093 public void disableRetry() { 094 retry = false; 095 } 096 097 @Override 098 public HasNext hasNext() { 099 // hasNext is idempotent, so we can call it again and do not need to store its return value 100 if (retry) { 101 TEST_UTIL.waitFor(TEST_TIMEOUT_MS, () -> super.hasNext() == HasNext.YES); 102 } 103 return super.hasNext(); 104 } 105 } 106 107 private static NavigableMap<byte[], Integer> getScopes() { 108 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 109 scopes.put(family, 1); 110 return scopes; 111 } 112 113 class PathWatcher implements WALActionsListener { 114 115 Path currentPath; 116 117 @Override 118 public void preLogRoll(Path oldPath, Path newPath) { 119 logQueue.enqueueLog(newPath, fakeWalGroupId); 120 currentPath = newPath; 121 } 122 } 123 124 protected WAL log; 125 protected ReplicationSourceLogQueue logQueue; 126 protected PathWatcher pathWatcher; 127 128 @Rule 129 public TestName tn = new TestName(); 130 protected final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 131 132 protected static void startCluster() throws Exception { 133 CONF = TEST_UTIL.getConfiguration(); 134 CONF.setLong("replication.source.sleepforretries", 10); 135 TEST_UTIL.startMiniDFSCluster(3); 136 137 cluster = TEST_UTIL.getDFSCluster(); 138 fs = cluster.getFileSystem(); 139 } 140 141 @AfterClass 142 public static void tearDownAfterClass() throws Exception { 143 TEST_UTIL.shutdownMiniCluster(); 144 } 145 146 protected void initWAL() throws IOException { 147 ReplicationSource source = mock(ReplicationSource.class); 148 MetricsSource metricsSource = new MetricsSource("2"); 149 // Source with the same id is shared and carries values from the last run 150 metricsSource.clear(); 151 logQueue = new ReplicationSourceLogQueue(CONF, metricsSource, source); 152 pathWatcher = new PathWatcher(); 153 final WALFactory wals = new WALFactory(CONF, tn.getMethodName().replaceAll("[\\[:]", "_")); 154 wals.getWALProvider().addWALActionsListener(pathWatcher); 155 log = wals.getWAL(info); 156 } 157 158 @After 159 public void tearDown() throws Exception { 160 Closeables.close(log, true); 161 } 162 163 protected void appendToLogAndSync() throws IOException { 164 appendToLogAndSync(1); 165 } 166 167 protected void appendToLogAndSync(int count) throws IOException { 168 long txid = appendToLog(count); 169 log.sync(txid); 170 } 171 172 protected long appendToLog(int count) throws IOException { 173 return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 174 EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdits(count)); 175 } 176 177 protected WALEdit getWALEdits(int count) { 178 WALEdit edit = new WALEdit(); 179 for (int i = 0; i < count; i++) { 180 edit.add(new KeyValue(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), family, qualifier, 181 EnvironmentEdgeManager.currentTime(), qualifier)); 182 } 183 return edit; 184 } 185}