001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.mockito.Mockito.mock; 021 022import java.io.IOException; 023import java.util.NavigableMap; 024import java.util.TreeMap; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.KeyValue; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.RegionInfo; 033import org.apache.hadoop.hbase.client.RegionInfoBuilder; 034import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 035import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.wal.WAL; 039import org.apache.hadoop.hbase.wal.WALEdit; 040import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 041import org.apache.hadoop.hbase.wal.WALFactory; 042import org.apache.hadoop.hbase.wal.WALKeyImpl; 043import org.apache.hadoop.hdfs.DistributedFileSystem; 044import org.apache.hadoop.hdfs.MiniDFSCluster; 045import org.junit.After; 046import org.junit.AfterClass; 047import org.junit.Rule; 048import org.junit.rules.TestName; 049 050import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 051 052/** 053 * Base class for WALEntryStream tests. 054 */ 055public abstract class WALEntryStreamTestBase { 056 057 protected static final long TEST_TIMEOUT_MS = 5000; 058 protected static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();; 059 protected static Configuration CONF; 060 protected static DistributedFileSystem fs; 061 protected static MiniDFSCluster cluster; 062 protected static final TableName tableName = TableName.valueOf("tablename"); 063 protected static final byte[] family = Bytes.toBytes("column"); 064 protected static final byte[] qualifier = Bytes.toBytes("qualifier"); 065 protected static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName) 066 .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build(); 067 protected static final NavigableMap<byte[], Integer> scopes = getScopes(); 068 protected final String fakeWalGroupId = "fake-wal-group-id"; 069 070 /** 071 * Test helper that waits until a non-null entry is available in the stream next or times out. A 072 * {@link WALEntryStream} provides a streaming access to a queue of log files. Since the stream 073 * can be consumed as the file is being written, callers relying on {@link WALEntryStream#next()} 074 * may need to retry multiple times before an entry appended to the WAL is visible to the stream 075 * consumers. One such cause of delay is the close() of writer writing these log files. While the 076 * closure is in progress, the stream does not switch to the next log in the queue and next() may 077 * return null entries. This utility wraps these retries into a single next call and that makes 078 * the test code simpler. 079 */ 080 protected static class WALEntryStreamWithRetries extends WALEntryStream { 081 082 private boolean retry = true; 083 084 public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, FileSystem fs, 085 Configuration conf, long startPosition, WALFileLengthProvider walFileLengthProvider, 086 MetricsSource metrics, String walGroupId) { 087 super(logQueue, fs, conf, startPosition, walFileLengthProvider, metrics, walGroupId); 088 } 089 090 public void enableRetry() { 091 retry = true; 092 } 093 094 public void disableRetry() { 095 retry = false; 096 } 097 098 @Override 099 public HasNext hasNext() { 100 // hasNext is idempotent, so we can call it again and do not need to store its return value 101 if (retry) { 102 TEST_UTIL.waitFor(TEST_TIMEOUT_MS, () -> super.hasNext() == HasNext.YES); 103 } 104 return super.hasNext(); 105 } 106 } 107 108 private static NavigableMap<byte[], Integer> getScopes() { 109 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 110 scopes.put(family, 1); 111 return scopes; 112 } 113 114 class PathWatcher implements WALActionsListener { 115 116 Path currentPath; 117 118 @Override 119 public void preLogRoll(Path oldPath, Path newPath) { 120 logQueue.enqueueLog(newPath, fakeWalGroupId); 121 currentPath = newPath; 122 } 123 } 124 125 protected WAL log; 126 protected ReplicationSourceLogQueue logQueue; 127 protected PathWatcher pathWatcher; 128 129 @Rule 130 public TestName tn = new TestName(); 131 protected final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 132 133 protected static void startCluster() throws Exception { 134 CONF = TEST_UTIL.getConfiguration(); 135 CONF.setLong("replication.source.sleepforretries", 10); 136 TEST_UTIL.startMiniDFSCluster(3); 137 138 cluster = TEST_UTIL.getDFSCluster(); 139 fs = cluster.getFileSystem(); 140 } 141 142 @AfterClass 143 public static void tearDownAfterClass() throws Exception { 144 TEST_UTIL.shutdownMiniCluster(); 145 } 146 147 protected void initWAL() throws IOException { 148 ReplicationSource source = mock(ReplicationSource.class); 149 MetricsSource metricsSource = new MetricsSource("2"); 150 // Source with the same id is shared and carries values from the last run 151 metricsSource.clear(); 152 logQueue = new ReplicationSourceLogQueue(CONF, metricsSource, source); 153 pathWatcher = new PathWatcher(); 154 final WALFactory wals = new WALFactory(CONF, tn.getMethodName().replaceAll("[\\[:]", "_")); 155 wals.getWALProvider().addWALActionsListener(pathWatcher); 156 log = wals.getWAL(info); 157 } 158 159 @After 160 public void tearDown() throws Exception { 161 Closeables.close(log, true); 162 } 163 164 protected void appendToLogAndSync() throws IOException { 165 appendToLogAndSync(1); 166 } 167 168 protected void appendToLogAndSync(int count) throws IOException { 169 long txid = appendToLog(count); 170 log.sync(txid); 171 } 172 173 protected long appendToLog(int count) throws IOException { 174 return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 175 EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdits(count)); 176 } 177 178 protected WALEdit getWALEdits(int count) { 179 WALEdit edit = new WALEdit(); 180 for (int i = 0; i < count; i++) { 181 WALEditInternalHelper.addExtendedCell(edit, 182 new KeyValue(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), family, qualifier, 183 EnvironmentEdgeManager.currentTime(), qualifier)); 184 } 185 return edit; 186 } 187}