001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY; 022import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; 023import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; 024 025import java.io.File; 026import java.io.IOException; 027import java.net.BindException; 028import java.net.URI; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.List; 032import java.util.Properties; 033import java.util.concurrent.ExecutionException; 034import org.apache.commons.io.FileUtils; 035import org.apache.commons.lang3.StringUtils; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.crypto.CipherSuite; 038import org.apache.hadoop.crypto.key.KeyProvider; 039import org.apache.hadoop.crypto.key.KeyProviderFactory; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 043import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 044import org.apache.hadoop.hbase.security.SecurityConstants; 045import org.apache.hadoop.hbase.testclassification.LargeTests; 046import org.apache.hadoop.hbase.testclassification.MiscTests; 047import org.apache.hadoop.hdfs.DistributedFileSystem; 048import org.apache.hadoop.minikdc.MiniKdc; 049import org.apache.hadoop.security.UserGroupInformation; 050import org.junit.After; 051import org.junit.AfterClass; 052import org.junit.Before; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059import org.junit.runner.RunWith; 060import org.junit.runners.Parameterized; 061import org.junit.runners.Parameterized.Parameter; 062import org.junit.runners.Parameterized.Parameters; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hbase.thirdparty.io.netty.channel.Channel; 067import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 068import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 069import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 070import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 071 072@RunWith(Parameterized.class) 073@Category({ MiscTests.class, LargeTests.class }) 074public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { 075 076 private static final Logger LOG = 077 LoggerFactory.getLogger(TestSaslFanOutOneBlockAsyncDFSOutput.class); 078 079 @ClassRule 080 public static final HBaseClassTestRule CLASS_RULE = 081 HBaseClassTestRule.forClass(TestSaslFanOutOneBlockAsyncDFSOutput.class); 082 083 private static DistributedFileSystem FS; 084 085 private static EventLoopGroup EVENT_LOOP_GROUP; 086 087 private static Class<? extends Channel> CHANNEL_CLASS; 088 089 private static int READ_TIMEOUT_MS = 200000; 090 091 private static final File KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath()); 092 093 private static MiniKdc KDC; 094 095 private static String HOST = "localhost"; 096 097 private static String USERNAME; 098 099 private static String PRINCIPAL; 100 101 private static String HTTP_PRINCIPAL; 102 103 private static String TEST_KEY_NAME = "test_key"; 104 105 private static StreamSlowMonitor MONITOR; 106 107 @Rule 108 public TestName name = new TestName(); 109 110 @Parameter(0) 111 public String protection; 112 113 @Parameter(1) 114 public String encryptionAlgorithm; 115 116 @Parameter(2) 117 public String cipherSuite; 118 119 @Parameters(name = "{index}: protection={0}, encryption={1}, cipherSuite={2}") 120 public static Iterable<Object[]> data() { 121 List<Object[]> params = new ArrayList<>(); 122 for (String protection : Arrays.asList("authentication", "integrity", "privacy")) { 123 for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) { 124 for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) { 125 params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite }); 126 } 127 } 128 } 129 return params; 130 } 131 132 private static void setUpKeyProvider(Configuration conf) throws Exception { 133 URI keyProviderUri = 134 new URI("jceks://file" + UTIL.getDataTestDir("test.jks").toUri().toString()); 135 conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString()); 136 KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf); 137 keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf)); 138 keyProvider.flush(); 139 keyProvider.close(); 140 } 141 142 /** 143 * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given 144 * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. 145 */ 146 private static MiniKdc setupMiniKdc(File keytabFile) throws Exception { 147 Properties conf = MiniKdc.createConf(); 148 conf.put(MiniKdc.DEBUG, true); 149 MiniKdc kdc = null; 150 File dir = null; 151 // There is time lag between selecting a port and trying to bind with it. It's possible that 152 // another service captures the port in between which'll result in BindException. 153 boolean bindException; 154 int numTries = 0; 155 do { 156 try { 157 bindException = false; 158 dir = new File(UTIL.getDataTestDir("kdc").toUri().getPath()); 159 kdc = new MiniKdc(conf, dir); 160 kdc.start(); 161 } catch (BindException e) { 162 FileUtils.deleteDirectory(dir); // clean directory 163 numTries++; 164 if (numTries == 3) { 165 LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times."); 166 throw e; 167 } 168 LOG.error("BindException encountered when setting up MiniKdc. Trying again."); 169 bindException = true; 170 } 171 } while (bindException); 172 System.setProperty(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, 173 keytabFile.getAbsolutePath()); 174 return kdc; 175 } 176 177 @BeforeClass 178 public static void setUpBeforeClass() throws Exception { 179 EVENT_LOOP_GROUP = new NioEventLoopGroup(); 180 CHANNEL_CLASS = NioSocketChannel.class; 181 UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); 182 KDC = setupMiniKdc(KEYTAB_FILE); 183 USERNAME = UserGroupInformation.getLoginUser().getShortUserName(); 184 PRINCIPAL = USERNAME + "/" + HOST; 185 HTTP_PRINCIPAL = "HTTP/" + HOST; 186 KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL); 187 188 setUpKeyProvider(UTIL.getConfiguration()); 189 HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration(), 190 PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm()); 191 HBaseKerberosUtils.setSSLConfiguration(UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class); 192 MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor"); 193 } 194 195 @AfterClass 196 public static void tearDownAfterClass() throws Exception { 197 if (EVENT_LOOP_GROUP != null) { 198 EVENT_LOOP_GROUP.shutdownGracefully().get(); 199 } 200 if (KDC != null) { 201 KDC.stop(); 202 } 203 shutdownMiniDFSCluster(); 204 } 205 206 private Path testDirOnTestFs; 207 208 private Path entryptionTestDirOnTestFs; 209 210 private void createEncryptionZone() throws Exception { 211 FS.createEncryptionZone(entryptionTestDirOnTestFs, TEST_KEY_NAME); 212 } 213 214 @Before 215 public void setUp() throws Exception { 216 UTIL.getConfiguration().set("dfs.data.transfer.protection", protection); 217 if (StringUtils.isBlank(encryptionAlgorithm) && StringUtils.isBlank(cipherSuite)) { 218 UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false); 219 } else { 220 UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true); 221 } 222 if (StringUtils.isBlank(encryptionAlgorithm)) { 223 UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); 224 } else { 225 UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm); 226 } 227 if (StringUtils.isBlank(cipherSuite)) { 228 UTIL.getConfiguration().unset(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); 229 } else { 230 UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite); 231 } 232 233 startMiniDFSCluster(3); 234 FS = CLUSTER.getFileSystem(); 235 testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_")); 236 FS.mkdirs(testDirOnTestFs); 237 entryptionTestDirOnTestFs = new Path("/" + testDirOnTestFs.getName() + "_enc"); 238 FS.mkdirs(entryptionTestDirOnTestFs); 239 createEncryptionZone(); 240 } 241 242 @After 243 public void tearDown() throws IOException { 244 shutdownMiniDFSCluster(); 245 } 246 247 private Path getTestFile() { 248 return new Path(testDirOnTestFs, "test"); 249 } 250 251 private Path getEncryptionTestFile() { 252 return new Path(entryptionTestDirOnTestFs, "test"); 253 } 254 255 private void test(Path file) throws IOException, InterruptedException, ExecutionException { 256 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 257 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, 258 true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); 259 TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out); 260 } 261 262 @Test 263 public void test() throws IOException, InterruptedException, ExecutionException { 264 test(getTestFile()); 265 test(getEncryptionTestFile()); 266 } 267}