001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertFalse; 021 022import java.io.IOException; 023import java.util.NavigableMap; 024import java.util.TreeMap; 025import java.util.concurrent.ThreadLocalRandom; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.TableDescriptors; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.client.RegionInfoBuilder; 036import org.apache.hadoop.hbase.client.TableDescriptor; 037import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 038import org.apache.hadoop.hbase.testclassification.MediumTests; 039import org.apache.hadoop.hbase.testclassification.RegionServerTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.CommonFSUtils; 042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 043import org.apache.hadoop.hbase.util.FSTableDescriptors; 044import org.apache.hadoop.hbase.util.Threads; 045import org.apache.hadoop.hbase.wal.FSHLogProvider; 046import org.apache.hadoop.hbase.wal.WAL; 047import org.apache.hadoop.hbase.wal.WALEdit; 048import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 049import org.apache.hadoop.hbase.wal.WALFactory; 050import org.apache.hadoop.hbase.wal.WALKeyImpl; 051import org.junit.ClassRule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057/** 058 * Test many concurrent appenders to an WAL while rolling the log. 059 */ 060@Category({ RegionServerTests.class, MediumTests.class }) 061public class TestLogRollingNoCluster { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestLogRollingNoCluster.class); 066 067 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 068 private final static byte[] EMPTY_1K_ARRAY = new byte[1024]; 069 private static final int NUM_THREADS = 100; // Spin up this many threads 070 private static final int NUM_ENTRIES = 100; // How many entries to write 071 072 /** ProtobufLogWriter that simulates higher latencies in sync() call */ 073 public static class HighLatencySyncWriter extends ProtobufLogWriter { 074 @Override 075 public void sync(boolean forceSync) throws IOException { 076 Threads.sleep(ThreadLocalRandom.current().nextInt(10)); 077 super.sync(forceSync); 078 Threads.sleep(ThreadLocalRandom.current().nextInt(10)); 079 } 080 } 081 082 /** 083 * Spin up a bunch of threads and have them all append to a WAL. Roll the WAL frequently to try 084 * and trigger NPE. 085 */ 086 @Test 087 public void testContendedLogRolling() throws Exception { 088 TEST_UTIL.startMiniDFSCluster(3); 089 Path dir = TEST_UTIL.getDataTestDirOnTestFS(); 090 091 // The implementation needs to know the 'handler' count. 092 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS); 093 final Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 094 conf.set(WALFactory.WAL_PROVIDER, "filesystem"); 095 CommonFSUtils.setRootDir(conf, dir); 096 FSTableDescriptors fsTableDescriptors = new FSTableDescriptors(TEST_UTIL.getConfiguration()); 097 FSTableDescriptors.tryUpdateMetaTableDescriptor(TEST_UTIL.getConfiguration()); 098 TableDescriptor metaTableDescriptor = fsTableDescriptors.get(TableName.META_TABLE_NAME); 099 conf.set(FSHLogProvider.WRITER_IMPL, HighLatencySyncWriter.class.getName()); 100 final WALFactory wals = new WALFactory(conf, TestLogRollingNoCluster.class.getName()); 101 final WAL wal = wals.getWAL(null); 102 103 Appender[] appenders = null; 104 105 final int numThreads = NUM_THREADS; 106 appenders = new Appender[numThreads]; 107 try { 108 for (int i = 0; i < numThreads; i++) { 109 // Have each appending thread write 'count' entries 110 appenders[i] = new Appender(metaTableDescriptor, wal, i, NUM_ENTRIES); 111 } 112 for (int i = 0; i < numThreads; i++) { 113 appenders[i].start(); 114 } 115 for (int i = 0; i < numThreads; i++) { 116 // ensure that all threads are joined before closing the wal 117 appenders[i].join(); 118 } 119 } finally { 120 wals.close(); 121 } 122 for (int i = 0; i < numThreads; i++) { 123 assertFalse("Error: " + appenders[i].getException(), appenders[i].isException()); 124 } 125 TEST_UTIL.shutdownMiniDFSCluster(); 126 } 127 128 /** 129 * Appender thread. Appends to passed wal file. 130 */ 131 static class Appender extends Thread { 132 private final Logger log; 133 private final WAL wal; 134 private final int count; 135 private Exception e = null; 136 private final TableDescriptor metaTableDescriptor; 137 138 Appender(TableDescriptor metaTableDescriptor, final WAL wal, final int index, final int count) { 139 super("" + index); 140 this.wal = wal; 141 this.count = count; 142 this.metaTableDescriptor = metaTableDescriptor; 143 this.log = LoggerFactory.getLogger("Appender:" + getName()); 144 } 145 146 /** Returns Call when the thread is done. */ 147 boolean isException() { 148 return !isAlive() && this.e != null; 149 } 150 151 Exception getException() { 152 return this.e; 153 } 154 155 @Override 156 public void run() { 157 this.log.info(getName() + " started"); 158 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 159 try { 160 TableDescriptors tds = new FSTableDescriptors(TEST_UTIL.getConfiguration()); 161 FSTableDescriptors.tryUpdateMetaTableDescriptor(TEST_UTIL.getConfiguration()); 162 TableDescriptor htd = tds.get(TableName.META_TABLE_NAME); 163 for (int i = 0; i < this.count; i++) { 164 long now = EnvironmentEdgeManager.currentTime(); 165 // Roll every ten edits 166 if (i % 10 == 0) { 167 this.wal.rollWriter(); 168 } 169 WALEdit edit = new WALEdit(); 170 byte[] bytes = Bytes.toBytes(i); 171 WALEditInternalHelper.addExtendedCell(edit, 172 new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY)); 173 RegionInfo hri = RegionInfoBuilder.FIRST_META_REGIONINFO; 174 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 175 for (byte[] fam : this.metaTableDescriptor.getColumnFamilyNames()) { 176 scopes.put(fam, 0); 177 } 178 final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), 179 TableName.META_TABLE_NAME, now, mvcc, scopes), edit); 180 Threads.sleep(ThreadLocalRandom.current().nextInt(5)); 181 wal.sync(txid); 182 } 183 String msg = getName() + " finished"; 184 if (isException()) this.log.info(msg, getException()); 185 else this.log.info(msg); 186 } catch (Exception e) { 187 this.e = e; 188 log.info("Caught exception from Appender:" + getName(), e); 189 } finally { 190 // Call sync on our log.else threads just hang out. 191 try { 192 this.wal.sync(); 193 } catch (IOException e) { 194 throw new RuntimeException(e); 195 } 196 } 197 } 198 } 199 200 // @org.junit.Rule 201 // public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = 202 // new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); 203}