001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertFalse;
021
022import java.io.IOException;
023import java.util.NavigableMap;
024import java.util.TreeMap;
025import java.util.concurrent.ThreadLocalRandom;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.TableDescriptors;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.client.RegionInfoBuilder;
036import org.apache.hadoop.hbase.client.TableDescriptor;
037import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.hbase.testclassification.RegionServerTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.CommonFSUtils;
042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
043import org.apache.hadoop.hbase.util.FSTableDescriptors;
044import org.apache.hadoop.hbase.util.Threads;
045import org.apache.hadoop.hbase.wal.FSHLogProvider;
046import org.apache.hadoop.hbase.wal.WAL;
047import org.apache.hadoop.hbase.wal.WALEdit;
048import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
049import org.apache.hadoop.hbase.wal.WALFactory;
050import org.apache.hadoop.hbase.wal.WALKeyImpl;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * Test many concurrent appenders to an WAL while rolling the log.
059 */
060@Category({ RegionServerTests.class, MediumTests.class })
061public class TestLogRollingNoCluster {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065    HBaseClassTestRule.forClass(TestLogRollingNoCluster.class);
066
067  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
068  private final static byte[] EMPTY_1K_ARRAY = new byte[1024];
069  private static final int NUM_THREADS = 100; // Spin up this many threads
070  private static final int NUM_ENTRIES = 100; // How many entries to write
071
072  /** ProtobufLogWriter that simulates higher latencies in sync() call */
073  public static class HighLatencySyncWriter extends ProtobufLogWriter {
074    @Override
075    public void sync(boolean forceSync) throws IOException {
076      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
077      super.sync(forceSync);
078      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
079    }
080  }
081
082  /**
083   * Spin up a bunch of threads and have them all append to a WAL. Roll the WAL frequently to try
084   * and trigger NPE.
085   */
086  @Test
087  public void testContendedLogRolling() throws Exception {
088    TEST_UTIL.startMiniDFSCluster(3);
089    Path dir = TEST_UTIL.getDataTestDirOnTestFS();
090
091    // The implementation needs to know the 'handler' count.
092    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
093    final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
094    conf.set(WALFactory.WAL_PROVIDER, "filesystem");
095    CommonFSUtils.setRootDir(conf, dir);
096    FSTableDescriptors fsTableDescriptors = new FSTableDescriptors(TEST_UTIL.getConfiguration());
097    FSTableDescriptors.tryUpdateMetaTableDescriptor(TEST_UTIL.getConfiguration());
098    TableDescriptor metaTableDescriptor = fsTableDescriptors.get(TableName.META_TABLE_NAME);
099    conf.set(FSHLogProvider.WRITER_IMPL, HighLatencySyncWriter.class.getName());
100    final WALFactory wals = new WALFactory(conf, TestLogRollingNoCluster.class.getName());
101    final WAL wal = wals.getWAL(null);
102
103    Appender[] appenders = null;
104
105    final int numThreads = NUM_THREADS;
106    appenders = new Appender[numThreads];
107    try {
108      for (int i = 0; i < numThreads; i++) {
109        // Have each appending thread write 'count' entries
110        appenders[i] = new Appender(metaTableDescriptor, wal, i, NUM_ENTRIES);
111      }
112      for (int i = 0; i < numThreads; i++) {
113        appenders[i].start();
114      }
115      for (int i = 0; i < numThreads; i++) {
116        // ensure that all threads are joined before closing the wal
117        appenders[i].join();
118      }
119    } finally {
120      wals.close();
121    }
122    for (int i = 0; i < numThreads; i++) {
123      assertFalse("Error: " + appenders[i].getException(), appenders[i].isException());
124    }
125    TEST_UTIL.shutdownMiniDFSCluster();
126  }
127
128  /**
129   * Appender thread. Appends to passed wal file.
130   */
131  static class Appender extends Thread {
132    private final Logger log;
133    private final WAL wal;
134    private final int count;
135    private Exception e = null;
136    private final TableDescriptor metaTableDescriptor;
137
138    Appender(TableDescriptor metaTableDescriptor, final WAL wal, final int index, final int count) {
139      super("" + index);
140      this.wal = wal;
141      this.count = count;
142      this.metaTableDescriptor = metaTableDescriptor;
143      this.log = LoggerFactory.getLogger("Appender:" + getName());
144    }
145
146    /** Returns Call when the thread is done. */
147    boolean isException() {
148      return !isAlive() && this.e != null;
149    }
150
151    Exception getException() {
152      return this.e;
153    }
154
155    @Override
156    public void run() {
157      this.log.info(getName() + " started");
158      final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
159      try {
160        TableDescriptors tds = new FSTableDescriptors(TEST_UTIL.getConfiguration());
161        FSTableDescriptors.tryUpdateMetaTableDescriptor(TEST_UTIL.getConfiguration());
162        TableDescriptor htd = tds.get(TableName.META_TABLE_NAME);
163        for (int i = 0; i < this.count; i++) {
164          long now = EnvironmentEdgeManager.currentTime();
165          // Roll every ten edits
166          if (i % 10 == 0) {
167            this.wal.rollWriter();
168          }
169          WALEdit edit = new WALEdit();
170          byte[] bytes = Bytes.toBytes(i);
171          WALEditInternalHelper.addExtendedCell(edit,
172            new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY));
173          RegionInfo hri = RegionInfoBuilder.FIRST_META_REGIONINFO;
174          NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
175          for (byte[] fam : this.metaTableDescriptor.getColumnFamilyNames()) {
176            scopes.put(fam, 0);
177          }
178          final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
179            TableName.META_TABLE_NAME, now, mvcc, scopes), edit);
180          Threads.sleep(ThreadLocalRandom.current().nextInt(5));
181          wal.sync(txid);
182        }
183        String msg = getName() + " finished";
184        if (isException()) this.log.info(msg, getException());
185        else this.log.info(msg);
186      } catch (Exception e) {
187        this.e = e;
188        log.info("Caught exception from Appender:" + getName(), e);
189      } finally {
190        // Call sync on our log.else threads just hang out.
191        try {
192          this.wal.sync();
193        } catch (IOException e) {
194          throw new RuntimeException(e);
195        }
196      }
197    }
198  }
199
200  // @org.junit.Rule
201  // public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
202  // new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
203}