001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022import static org.mockito.Mockito.mockConstruction;
023import static org.mockito.Mockito.verify;
024import static org.mockito.Mockito.verifyNoMoreInteractions;
025
026import java.io.IOException;
027import java.lang.reflect.Method;
028import java.util.Optional;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
032import org.apache.hadoop.hbase.testclassification.MediumTests;
033import org.apache.hadoop.hbase.testclassification.MiscTests;
034import org.apache.hadoop.hdfs.DFSClient;
035import org.apache.hadoop.hdfs.DFSOutputStream;
036import org.apache.hadoop.hdfs.DistributedFileSystem;
037import org.apache.hadoop.hdfs.DummyDFSOutputStream;
038import org.junit.AfterClass;
039import org.junit.BeforeClass;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.mockito.MockedConstruction;
044
045import org.apache.hbase.thirdparty.io.netty.channel.Channel;
046import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
047import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
048import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
049import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
050
051/**
052 * Make sure lease renewal works. Since it is in a background thread, normal read/write test can not
053 * verify it.
054 * <p>
055 * See HBASE-28955 for more details.
056 */
057@Category({ MiscTests.class, MediumTests.class })
058public class TestLeaseRenewal extends AsyncFSTestBase {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062    HBaseClassTestRule.forClass(TestLeaseRenewal.class);
063
064  private static DistributedFileSystem FS;
065  private static EventLoopGroup EVENT_LOOP_GROUP;
066  private static Class<? extends Channel> CHANNEL_CLASS;
067  private static StreamSlowMonitor MONITOR;
068
069  @BeforeClass
070  public static void setUp() throws Exception {
071    startMiniDFSCluster(3);
072    FS = CLUSTER.getFileSystem();
073    EVENT_LOOP_GROUP = new NioEventLoopGroup();
074    CHANNEL_CLASS = NioSocketChannel.class;
075    MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
076  }
077
078  @AfterClass
079  public static void tearDown() throws Exception {
080    if (EVENT_LOOP_GROUP != null) {
081      EVENT_LOOP_GROUP.shutdownGracefully().get();
082    }
083    shutdownMiniDFSCluster();
084  }
085
086  private FanOutOneBlockAsyncDFSOutput create(String file)
087    throws IllegalArgumentException, IOException {
088    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
089    return FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/test_lease_renew"), true,
090      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
091  }
092
093  @Test
094  public void testLeaseRenew() throws IOException {
095    DFSClient client = FS.getClient();
096    assertFalse(client.renewLease());
097
098    FanOutOneBlockAsyncDFSOutput out = create("/test_lease_renew");
099    assertTrue(client.renewLease());
100    client.closeAllFilesBeingWritten(false);
101    assertTrue(out.isClosed());
102
103    assertFalse(client.renewLease());
104
105    out = create("/test_lease_renew");
106    assertTrue(client.renewLease());
107    client.closeAllFilesBeingWritten(true);
108    assertTrue(out.isClosed());
109  }
110
111  private Optional<Method> getUniqKeyMethod() {
112    try {
113      return Optional.of(DFSOutputStream.class.getMethod("getUniqKey"));
114    } catch (NoSuchMethodException e) {
115      // should be hadoop 3.3 or below
116      return Optional.empty();
117    }
118  }
119
120  @Test
121  public void testEnsureMethodsCalledWhenLeaseRenewal() throws Exception {
122    try (MockedConstruction<DummyDFSOutputStream> mocked =
123      mockConstruction(DummyDFSOutputStream.class)) {
124      try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_lease_renewal")) {
125        DummyDFSOutputStream dummy = mocked.constructed().get(0);
126        assertTrue(FS.getClient().renewLease());
127        Optional<Method> getUniqKeyMethod = getUniqKeyMethod();
128        if (getUniqKeyMethod.isPresent()) {
129          getUniqKeyMethod.get().invoke(verify(dummy));
130          Method getNamespaceMethod = DFSOutputStream.class.getMethod("getNamespace");
131          getNamespaceMethod.invoke(verify(dummy));
132        } else {
133          verify(dummy).getFileId();
134        }
135        verifyNoMoreInteractions(dummy);
136      }
137    }
138  }
139
140  private void verifyGetUniqKey(DummyDFSOutputStream dummy) throws Exception {
141    Optional<Method> getUniqKeyMethod = getUniqKeyMethod();
142    if (getUniqKeyMethod.isPresent()) {
143      getUniqKeyMethod.get().invoke(verify(dummy));
144    } else {
145      verify(dummy).getFileId();
146    }
147  }
148
149  @Test
150  public void testEnsureMethodsCalledWhenClosing() throws Exception {
151    try (MockedConstruction<DummyDFSOutputStream> mocked =
152      mockConstruction(DummyDFSOutputStream.class)) {
153      try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_closing")) {
154        DummyDFSOutputStream dummy = mocked.constructed().get(0);
155        verifyGetUniqKey(dummy);
156        FS.getClient().closeAllFilesBeingWritten(false);
157        verify(dummy).close();
158
159        verifyNoMoreInteractions(dummy);
160      }
161    }
162  }
163
164  @Test
165  public void testEnsureMethodsCalledWhenAborting() throws Exception {
166    try (MockedConstruction<DummyDFSOutputStream> mocked =
167      mockConstruction(DummyDFSOutputStream.class)) {
168      try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_aborting")) {
169        DummyDFSOutputStream dummy = mocked.constructed().get(0);
170        verifyGetUniqKey(dummy);
171        FS.getClient().closeAllFilesBeingWritten(true);
172        verify(dummy).abort();
173        verifyNoMoreInteractions(dummy);
174      }
175    }
176  }
177}