001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.util.concurrent.atomic.AtomicInteger; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.log.HBaseMarkers; 030import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 031import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; 032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 033import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 034import org.apache.hadoop.hbase.testclassification.LargeTests; 035import org.apache.hadoop.hbase.testclassification.MasterTests; 036import org.apache.hadoop.hbase.util.CommonFSUtils; 037import org.apache.hadoop.hbase.util.Threads; 038import org.apache.hadoop.hdfs.MiniDFSCluster; 039import org.apache.hadoop.hdfs.server.datanode.DataNode; 040import org.junit.Before; 041import org.junit.ClassRule; 042import org.junit.Test; 043import org.junit.experimental.categories.Category; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047@Category({ MasterTests.class, LargeTests.class }) 048public class TestWALProcedureStoreOnHDFS { 049 050 @ClassRule 051 public static final HBaseClassTestRule CLASS_RULE = 052 HBaseClassTestRule.forClass(TestWALProcedureStoreOnHDFS.class); 053 054 private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStoreOnHDFS.class); 055 056 protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 057 058 private WALProcedureStore store; 059 060 private ProcedureStore.ProcedureStoreListener stopProcedureListener = 061 new ProcedureStore.ProcedureStoreListener() { 062 @Override 063 public void postSync() { 064 } 065 066 @Override 067 public void abortProcess() { 068 LOG.error(HBaseMarkers.FATAL, "Abort the Procedure Store"); 069 store.stop(true); 070 } 071 }; 072 073 @Before 074 public void initConfig() { 075 Configuration conf = UTIL.getConfiguration(); 076 077 conf.setInt("dfs.replication", 3); 078 conf.setInt("dfs.namenode.replication.min", 3); 079 080 // increase the value for slow test-env 081 conf.setInt(WALProcedureStore.WAIT_BEFORE_ROLL_CONF_KEY, 1000); 082 conf.setInt(WALProcedureStore.ROLL_RETRIES_CONF_KEY, 10); 083 conf.setInt(WALProcedureStore.MAX_SYNC_FAILURE_ROLL_CONF_KEY, 10); 084 } 085 086 // No @Before because some tests need to do additional config first 087 private void setupDFS() throws Exception { 088 Configuration conf = UTIL.getConfiguration(); 089 MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3); 090 CommonFSUtils.setWALRootDir(conf, new Path(conf.get("fs.defaultFS"), "/tmp/wal")); 091 092 Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs"); 093 store = ProcedureTestingUtility.createWalStore(conf, logDir); 094 store.registerListener(stopProcedureListener); 095 store.start(8); 096 store.recoverLease(); 097 } 098 099 // No @After 100 @SuppressWarnings("JUnit4TearDownNotRun") 101 public void tearDown() throws Exception { 102 store.stop(false); 103 UTIL.getDFSCluster().getFileSystem().delete(store.getWALDir(), true); 104 105 try { 106 UTIL.shutdownMiniCluster(); 107 } catch (Exception e) { 108 LOG.warn("failure shutting down cluster", e); 109 } 110 } 111 112 @Test(expected = RuntimeException.class) 113 public void testWalAbortOnLowReplication() throws Exception { 114 setupDFS(); 115 116 assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); 117 118 LOG.info("Stop DataNode"); 119 UTIL.getDFSCluster().stopDataNode(0); 120 assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); 121 122 store.insert(new TestProcedure(1, -1), null); 123 for (long i = 2; store.isRunning(); ++i) { 124 assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); 125 store.insert(new TestProcedure(i, -1), null); 126 Thread.sleep(100); 127 } 128 assertFalse(store.isRunning()); 129 } 130 131 @Test 132 public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception { 133 setupDFS(); 134 135 assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); 136 store.registerListener(new ProcedureStore.ProcedureStoreListener() { 137 @Override 138 public void postSync() { 139 Threads.sleepWithoutInterrupt(2000); 140 } 141 142 @Override 143 public void abortProcess() { 144 } 145 }); 146 147 final AtomicInteger reCount = new AtomicInteger(0); 148 Thread[] thread = new Thread[store.getNumThreads() * 2 + 1]; 149 for (int i = 0; i < thread.length; ++i) { 150 final long procId = i + 1L; 151 thread[i] = new Thread(() -> { 152 try { 153 LOG.debug("[S] INSERT " + procId); 154 store.insert(new TestProcedure(procId, -1), null); 155 LOG.debug("[E] INSERT " + procId); 156 } catch (RuntimeException e) { 157 reCount.incrementAndGet(); 158 LOG.debug("[F] INSERT " + procId + ": " + e.getMessage()); 159 } 160 }); 161 thread[i].start(); 162 } 163 164 Thread.sleep(1000); 165 LOG.info("Stop DataNode"); 166 UTIL.getDFSCluster().stopDataNode(0); 167 assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); 168 169 for (int i = 0; i < thread.length; ++i) { 170 thread[i].join(); 171 } 172 173 assertFalse(store.isRunning()); 174 assertTrue(reCount.toString(), 175 reCount.get() >= store.getNumThreads() && reCount.get() < thread.length); 176 } 177 178 @Test 179 public void testWalRollOnLowReplication() throws Exception { 180 UTIL.getConfiguration().setInt("dfs.namenode.replication.min", 1); 181 setupDFS(); 182 183 int dnCount = 0; 184 store.insert(new TestProcedure(1, -1), null); 185 UTIL.getDFSCluster().restartDataNode(dnCount); 186 for (long i = 2; i < 100; ++i) { 187 store.insert(new TestProcedure(i, -1), null); 188 waitForNumReplicas(3); 189 Thread.sleep(100); 190 if ((i % 30) == 0) { 191 LOG.info("Restart Data Node"); 192 UTIL.getDFSCluster().restartDataNode(++dnCount % 3); 193 } 194 } 195 assertTrue(store.isRunning()); 196 } 197 198 public void waitForNumReplicas(int numReplicas) throws Exception { 199 while (UTIL.getDFSCluster().getDataNodes().size() < numReplicas) { 200 Thread.sleep(100); 201 } 202 203 for (int i = 0; i < numReplicas; ++i) { 204 for (DataNode dn : UTIL.getDFSCluster().getDataNodes()) { 205 while (!dn.isDatanodeFullyStarted()) { 206 Thread.sleep(100); 207 } 208 } 209 } 210 } 211}