001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY; 022import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; 023import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; 024 025import java.io.File; 026import java.io.IOException; 027import java.lang.reflect.Method; 028import java.net.BindException; 029import java.net.URI; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.List; 033import java.util.Properties; 034import java.util.concurrent.ExecutionException; 035import org.apache.commons.io.FileUtils; 036import org.apache.commons.lang3.StringUtils; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.crypto.CipherSuite; 039import org.apache.hadoop.crypto.key.KeyProvider; 040import org.apache.hadoop.crypto.key.KeyProviderFactory; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 044import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 045import org.apache.hadoop.hbase.security.SecurityConstants; 046import org.apache.hadoop.hbase.testclassification.LargeTests; 047import org.apache.hadoop.hbase.testclassification.MiscTests; 048import org.apache.hadoop.hdfs.DistributedFileSystem; 049import org.apache.hadoop.minikdc.MiniKdc; 050import org.apache.hadoop.security.UserGroupInformation; 051import org.junit.After; 052import org.junit.AfterClass; 053import org.junit.Before; 054import org.junit.BeforeClass; 055import org.junit.ClassRule; 056import org.junit.Rule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.junit.rules.TestName; 060import org.junit.runner.RunWith; 061import org.junit.runners.Parameterized; 062import org.junit.runners.Parameterized.Parameter; 063import org.junit.runners.Parameterized.Parameters; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067import org.apache.hbase.thirdparty.io.netty.channel.Channel; 068import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 069import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 070import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 071import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 072 073@RunWith(Parameterized.class) 074@Category({ MiscTests.class, LargeTests.class }) 075public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { 076 077 private static final Logger LOG = 078 LoggerFactory.getLogger(TestSaslFanOutOneBlockAsyncDFSOutput.class); 079 080 @ClassRule 081 public static final HBaseClassTestRule CLASS_RULE = 082 HBaseClassTestRule.forClass(TestSaslFanOutOneBlockAsyncDFSOutput.class); 083 084 private static DistributedFileSystem FS; 085 086 private static EventLoopGroup EVENT_LOOP_GROUP; 087 088 private static Class<? extends Channel> CHANNEL_CLASS; 089 090 private static int READ_TIMEOUT_MS = 200000; 091 092 private static final File KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath()); 093 094 private static MiniKdc KDC; 095 096 private static String HOST = "localhost"; 097 098 private static String USERNAME; 099 100 private static String PRINCIPAL; 101 102 private static String HTTP_PRINCIPAL; 103 104 private static String TEST_KEY_NAME = "test_key"; 105 106 private static StreamSlowMonitor MONITOR; 107 108 @Rule 109 public TestName name = new TestName(); 110 111 @Parameter(0) 112 public String protection; 113 114 @Parameter(1) 115 public String encryptionAlgorithm; 116 117 @Parameter(2) 118 public String cipherSuite; 119 120 @Parameters(name = "{index}: protection={0}, encryption={1}, cipherSuite={2}") 121 public static Iterable<Object[]> data() { 122 List<Object[]> params = new ArrayList<>(); 123 for (String protection : Arrays.asList("authentication", "integrity", "privacy")) { 124 for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) { 125 for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) { 126 params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite }); 127 } 128 } 129 } 130 return params; 131 } 132 133 private static void setUpKeyProvider(Configuration conf) throws Exception { 134 URI keyProviderUri = 135 new URI("jceks://file" + UTIL.getDataTestDir("test.jks").toUri().toString()); 136 conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString()); 137 KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf); 138 keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf)); 139 keyProvider.flush(); 140 keyProvider.close(); 141 } 142 143 /** 144 * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given 145 * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. 146 */ 147 private static MiniKdc setupMiniKdc(File keytabFile) throws Exception { 148 Properties conf = MiniKdc.createConf(); 149 conf.put(MiniKdc.DEBUG, true); 150 MiniKdc kdc = null; 151 File dir = null; 152 // There is time lag between selecting a port and trying to bind with it. It's possible that 153 // another service captures the port in between which'll result in BindException. 154 boolean bindException; 155 int numTries = 0; 156 do { 157 try { 158 bindException = false; 159 dir = new File(UTIL.getDataTestDir("kdc").toUri().getPath()); 160 kdc = new MiniKdc(conf, dir); 161 kdc.start(); 162 } catch (BindException e) { 163 FileUtils.deleteDirectory(dir); // clean directory 164 numTries++; 165 if (numTries == 3) { 166 LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times."); 167 throw e; 168 } 169 LOG.error("BindException encountered when setting up MiniKdc. Trying again."); 170 bindException = true; 171 } 172 } while (bindException); 173 System.setProperty(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, 174 keytabFile.getAbsolutePath()); 175 return kdc; 176 } 177 178 @BeforeClass 179 public static void setUpBeforeClass() throws Exception { 180 EVENT_LOOP_GROUP = new NioEventLoopGroup(); 181 CHANNEL_CLASS = NioSocketChannel.class; 182 UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); 183 KDC = setupMiniKdc(KEYTAB_FILE); 184 USERNAME = UserGroupInformation.getLoginUser().getShortUserName(); 185 PRINCIPAL = USERNAME + "/" + HOST; 186 HTTP_PRINCIPAL = "HTTP/" + HOST; 187 KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL); 188 189 setUpKeyProvider(UTIL.getConfiguration()); 190 HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration(), 191 PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm()); 192 HBaseKerberosUtils.setSSLConfiguration(UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class); 193 MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor"); 194 } 195 196 @AfterClass 197 public static void tearDownAfterClass() throws Exception { 198 if (EVENT_LOOP_GROUP != null) { 199 EVENT_LOOP_GROUP.shutdownGracefully().get(); 200 } 201 if (KDC != null) { 202 KDC.stop(); 203 } 204 shutdownMiniDFSCluster(); 205 } 206 207 private Path testDirOnTestFs; 208 209 private Path entryptionTestDirOnTestFs; 210 211 private void createEncryptionZone() throws Exception { 212 Method method = 213 DistributedFileSystem.class.getMethod("createEncryptionZone", Path.class, String.class); 214 method.invoke(FS, entryptionTestDirOnTestFs, TEST_KEY_NAME); 215 } 216 217 @Before 218 public void setUp() throws Exception { 219 UTIL.getConfiguration().set("dfs.data.transfer.protection", protection); 220 if (StringUtils.isBlank(encryptionAlgorithm) && StringUtils.isBlank(cipherSuite)) { 221 UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false); 222 } else { 223 UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true); 224 } 225 if (StringUtils.isBlank(encryptionAlgorithm)) { 226 UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); 227 } else { 228 UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm); 229 } 230 if (StringUtils.isBlank(cipherSuite)) { 231 UTIL.getConfiguration().unset(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); 232 } else { 233 UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite); 234 } 235 236 startMiniDFSCluster(3); 237 FS = CLUSTER.getFileSystem(); 238 testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_")); 239 FS.mkdirs(testDirOnTestFs); 240 entryptionTestDirOnTestFs = new Path("/" + testDirOnTestFs.getName() + "_enc"); 241 FS.mkdirs(entryptionTestDirOnTestFs); 242 createEncryptionZone(); 243 } 244 245 @After 246 public void tearDown() throws IOException { 247 shutdownMiniDFSCluster(); 248 } 249 250 private Path getTestFile() { 251 return new Path(testDirOnTestFs, "test"); 252 } 253 254 private Path getEncryptionTestFile() { 255 return new Path(entryptionTestDirOnTestFs, "test"); 256 } 257 258 private void test(Path file) throws IOException, InterruptedException, ExecutionException { 259 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 260 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, 261 true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); 262 TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out); 263 } 264 265 @Test 266 public void test() throws IOException, InterruptedException, ExecutionException { 267 test(getTestFile()); 268 test(getEncryptionTestFile()); 269 } 270}