001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertTrue; 022import static org.mockito.Mockito.mockConstruction; 023import static org.mockito.Mockito.verify; 024import static org.mockito.Mockito.verifyNoMoreInteractions; 025 026import java.io.IOException; 027import java.lang.reflect.Method; 028import java.util.Optional; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 032import org.apache.hadoop.hbase.testclassification.MediumTests; 033import org.apache.hadoop.hbase.testclassification.MiscTests; 034import org.apache.hadoop.hdfs.DFSClient; 035import org.apache.hadoop.hdfs.DFSOutputStream; 036import org.apache.hadoop.hdfs.DistributedFileSystem; 037import org.apache.hadoop.hdfs.DummyDFSOutputStream; 038import org.junit.AfterClass; 039import org.junit.BeforeClass; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.mockito.MockedConstruction; 044 045import org.apache.hbase.thirdparty.io.netty.channel.Channel; 046import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 047import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 048import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 049import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 050 051/** 052 * Make sure lease renewal works. Since it is in a background thread, normal read/write test can not 053 * verify it. 054 * <p> 055 * See HBASE-28955 for more details. 056 */ 057@Category({ MiscTests.class, MediumTests.class }) 058public class TestLeaseRenewal extends AsyncFSTestBase { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestLeaseRenewal.class); 063 064 private static DistributedFileSystem FS; 065 private static EventLoopGroup EVENT_LOOP_GROUP; 066 private static Class<? extends Channel> CHANNEL_CLASS; 067 private static StreamSlowMonitor MONITOR; 068 069 @BeforeClass 070 public static void setUp() throws Exception { 071 startMiniDFSCluster(3); 072 FS = CLUSTER.getFileSystem(); 073 EVENT_LOOP_GROUP = new NioEventLoopGroup(); 074 CHANNEL_CLASS = NioSocketChannel.class; 075 MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor"); 076 } 077 078 @AfterClass 079 public static void tearDown() throws Exception { 080 if (EVENT_LOOP_GROUP != null) { 081 EVENT_LOOP_GROUP.shutdownGracefully().get(); 082 } 083 shutdownMiniDFSCluster(); 084 } 085 086 private FanOutOneBlockAsyncDFSOutput create(String file) 087 throws IllegalArgumentException, IOException { 088 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 089 return FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/test_lease_renew"), true, 090 false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); 091 } 092 093 @Test 094 public void testLeaseRenew() throws IOException { 095 DFSClient client = FS.getClient(); 096 assertFalse(client.renewLease()); 097 098 FanOutOneBlockAsyncDFSOutput out = create("/test_lease_renew"); 099 assertTrue(client.renewLease()); 100 client.closeAllFilesBeingWritten(false); 101 assertTrue(out.isClosed()); 102 103 assertFalse(client.renewLease()); 104 105 out = create("/test_lease_renew"); 106 assertTrue(client.renewLease()); 107 client.closeAllFilesBeingWritten(true); 108 assertTrue(out.isClosed()); 109 } 110 111 private Optional<Method> getUniqKeyMethod() { 112 try { 113 return Optional.of(DFSOutputStream.class.getMethod("getUniqKey")); 114 } catch (NoSuchMethodException e) { 115 // should be hadoop 3.3 or below 116 return Optional.empty(); 117 } 118 } 119 120 @Test 121 public void testEnsureMethodsCalledWhenLeaseRenewal() throws Exception { 122 try (MockedConstruction<DummyDFSOutputStream> mocked = 123 mockConstruction(DummyDFSOutputStream.class)) { 124 try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_lease_renewal")) { 125 DummyDFSOutputStream dummy = mocked.constructed().get(0); 126 assertTrue(FS.getClient().renewLease()); 127 Optional<Method> getUniqKeyMethod = getUniqKeyMethod(); 128 if (getUniqKeyMethod.isPresent()) { 129 getUniqKeyMethod.get().invoke(verify(dummy)); 130 Method getNamespaceMethod = DFSOutputStream.class.getMethod("getNamespace"); 131 getNamespaceMethod.invoke(verify(dummy)); 132 } else { 133 verify(dummy).getFileId(); 134 } 135 verifyNoMoreInteractions(dummy); 136 } 137 } 138 } 139 140 private void verifyGetUniqKey(DummyDFSOutputStream dummy) throws Exception { 141 Optional<Method> getUniqKeyMethod = getUniqKeyMethod(); 142 if (getUniqKeyMethod.isPresent()) { 143 getUniqKeyMethod.get().invoke(verify(dummy)); 144 } else { 145 verify(dummy).getFileId(); 146 } 147 } 148 149 @Test 150 public void testEnsureMethodsCalledWhenClosing() throws Exception { 151 try (MockedConstruction<DummyDFSOutputStream> mocked = 152 mockConstruction(DummyDFSOutputStream.class)) { 153 try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_closing")) { 154 DummyDFSOutputStream dummy = mocked.constructed().get(0); 155 verifyGetUniqKey(dummy); 156 FS.getClient().closeAllFilesBeingWritten(false); 157 verify(dummy).close(); 158 159 verifyNoMoreInteractions(dummy); 160 } 161 } 162 } 163 164 @Test 165 public void testEnsureMethodsCalledWhenAborting() throws Exception { 166 try (MockedConstruction<DummyDFSOutputStream> mocked = 167 mockConstruction(DummyDFSOutputStream.class)) { 168 try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_aborting")) { 169 DummyDFSOutputStream dummy = mocked.constructed().get(0); 170 verifyGetUniqKey(dummy); 171 FS.getClient().closeAllFilesBeingWritten(true); 172 verify(dummy).abort(); 173 verifyNoMoreInteractions(dummy); 174 } 175 } 176 } 177}