001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertFalse; 021 022import java.io.IOException; 023import java.util.NavigableMap; 024import java.util.TreeMap; 025import java.util.concurrent.ThreadLocalRandom; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.TableDescriptors; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.client.RegionInfoBuilder; 036import org.apache.hadoop.hbase.client.TableDescriptor; 037import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 038import org.apache.hadoop.hbase.testclassification.MediumTests; 039import org.apache.hadoop.hbase.testclassification.RegionServerTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.CommonFSUtils; 042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 043import org.apache.hadoop.hbase.util.FSTableDescriptors; 044import org.apache.hadoop.hbase.util.Threads; 045import org.apache.hadoop.hbase.wal.FSHLogProvider; 046import org.apache.hadoop.hbase.wal.WAL; 047import org.apache.hadoop.hbase.wal.WALEdit; 048import org.apache.hadoop.hbase.wal.WALFactory; 049import org.apache.hadoop.hbase.wal.WALKeyImpl; 050import org.junit.ClassRule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * Test many concurrent appenders to an WAL while rolling the log. 058 */ 059@Category({ RegionServerTests.class, MediumTests.class }) 060public class TestLogRollingNoCluster { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestLogRollingNoCluster.class); 065 066 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 067 private final static byte[] EMPTY_1K_ARRAY = new byte[1024]; 068 private static final int NUM_THREADS = 100; // Spin up this many threads 069 private static final int NUM_ENTRIES = 100; // How many entries to write 070 071 /** ProtobufLogWriter that simulates higher latencies in sync() call */ 072 public static class HighLatencySyncWriter extends ProtobufLogWriter { 073 @Override 074 public void sync(boolean forceSync) throws IOException { 075 Threads.sleep(ThreadLocalRandom.current().nextInt(10)); 076 super.sync(forceSync); 077 Threads.sleep(ThreadLocalRandom.current().nextInt(10)); 078 } 079 } 080 081 /** 082 * Spin up a bunch of threads and have them all append to a WAL. Roll the WAL frequently to try 083 * and trigger NPE. 084 */ 085 @Test 086 public void testContendedLogRolling() throws Exception { 087 TEST_UTIL.startMiniDFSCluster(3); 088 Path dir = TEST_UTIL.getDataTestDirOnTestFS(); 089 090 // The implementation needs to know the 'handler' count. 091 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS); 092 final Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 093 conf.set(WALFactory.WAL_PROVIDER, "filesystem"); 094 CommonFSUtils.setRootDir(conf, dir); 095 FSTableDescriptors fsTableDescriptors = new FSTableDescriptors(TEST_UTIL.getConfiguration()); 096 FSTableDescriptors.tryUpdateMetaTableDescriptor(TEST_UTIL.getConfiguration()); 097 TableDescriptor metaTableDescriptor = fsTableDescriptors.get(TableName.META_TABLE_NAME); 098 conf.set(FSHLogProvider.WRITER_IMPL, HighLatencySyncWriter.class.getName()); 099 final WALFactory wals = new WALFactory(conf, TestLogRollingNoCluster.class.getName()); 100 final WAL wal = wals.getWAL(null); 101 102 Appender[] appenders = null; 103 104 final int numThreads = NUM_THREADS; 105 appenders = new Appender[numThreads]; 106 try { 107 for (int i = 0; i < numThreads; i++) { 108 // Have each appending thread write 'count' entries 109 appenders[i] = new Appender(metaTableDescriptor, wal, i, NUM_ENTRIES); 110 } 111 for (int i = 0; i < numThreads; i++) { 112 appenders[i].start(); 113 } 114 for (int i = 0; i < numThreads; i++) { 115 // ensure that all threads are joined before closing the wal 116 appenders[i].join(); 117 } 118 } finally { 119 wals.close(); 120 } 121 for (int i = 0; i < numThreads; i++) { 122 assertFalse("Error: " + appenders[i].getException(), appenders[i].isException()); 123 } 124 TEST_UTIL.shutdownMiniDFSCluster(); 125 } 126 127 /** 128 * Appender thread. Appends to passed wal file. 129 */ 130 static class Appender extends Thread { 131 private final Logger log; 132 private final WAL wal; 133 private final int count; 134 private Exception e = null; 135 private final TableDescriptor metaTableDescriptor; 136 137 Appender(TableDescriptor metaTableDescriptor, final WAL wal, final int index, final int count) { 138 super("" + index); 139 this.wal = wal; 140 this.count = count; 141 this.metaTableDescriptor = metaTableDescriptor; 142 this.log = LoggerFactory.getLogger("Appender:" + getName()); 143 } 144 145 /** Returns Call when the thread is done. */ 146 boolean isException() { 147 return !isAlive() && this.e != null; 148 } 149 150 Exception getException() { 151 return this.e; 152 } 153 154 @Override 155 public void run() { 156 this.log.info(getName() + " started"); 157 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 158 try { 159 TableDescriptors tds = new FSTableDescriptors(TEST_UTIL.getConfiguration()); 160 TableDescriptor htd = tds.get(TableName.META_TABLE_NAME); 161 for (int i = 0; i < this.count; i++) { 162 long now = EnvironmentEdgeManager.currentTime(); 163 // Roll every ten edits 164 if (i % 10 == 0) { 165 this.wal.rollWriter(); 166 } 167 WALEdit edit = new WALEdit(); 168 byte[] bytes = Bytes.toBytes(i); 169 edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY)); 170 RegionInfo hri = RegionInfoBuilder.FIRST_META_REGIONINFO; 171 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 172 for (byte[] fam : this.metaTableDescriptor.getColumnFamilyNames()) { 173 scopes.put(fam, 0); 174 } 175 final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), 176 TableName.META_TABLE_NAME, now, mvcc, scopes), edit); 177 Threads.sleep(ThreadLocalRandom.current().nextInt(5)); 178 wal.sync(txid); 179 } 180 String msg = getName() + " finished"; 181 if (isException()) this.log.info(msg, getException()); 182 else this.log.info(msg); 183 } catch (Exception e) { 184 this.e = e; 185 log.info("Caught exception from Appender:" + getName(), e); 186 } finally { 187 // Call sync on our log.else threads just hang out. 188 try { 189 this.wal.sync(); 190 } catch (IOException e) { 191 throw new RuntimeException(e); 192 } 193 } 194 } 195 } 196 197 // @org.junit.Rule 198 // public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = 199 // new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); 200}