001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.Closeable;
026import java.io.File;
027import java.net.URI;
028import java.util.Collection;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.client.Scan;
033import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
034import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
035import org.apache.hadoop.hbase.security.access.AccessController;
036import org.apache.hadoop.hbase.security.access.PermissionStorage;
037import org.apache.hadoop.hbase.security.access.SecureTestUtil;
038import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders;
039import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
040import org.apache.hadoop.hbase.security.token.TokenProvider;
041import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
042import org.apache.hadoop.hbase.testclassification.MapReduceTests;
043import org.apache.hadoop.hbase.testclassification.MediumTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
046import org.apache.hadoop.io.LongWritable;
047import org.apache.hadoop.io.Text;
048import org.apache.hadoop.mapreduce.Job;
049import org.apache.hadoop.minikdc.MiniKdc;
050import org.apache.hadoop.security.Credentials;
051import org.apache.hadoop.security.UserGroupInformation;
052import org.apache.hadoop.security.token.Token;
053import org.apache.hadoop.security.token.TokenIdentifier;
054import org.junit.After;
055import org.junit.ClassRule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058
059/**
060 * Test different variants of initTableMapperJob method
061 */
062@Category({ MapReduceTests.class, MediumTests.class })
063public class TestTableMapReduceUtil {
064  private static final String HTTP_PRINCIPAL = "HTTP/localhost";
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestTableMapReduceUtil.class);
069
070  @After
071  public void after() {
072    SaslClientAuthenticationProviders.reset();
073  }
074
075  /*
076   * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because the method
077   * depends on an online cluster.
078   */
079
080  @Test
081  public void testInitTableMapperJob1() throws Exception {
082    Configuration configuration = new Configuration();
083    Job job = Job.getInstance(configuration, "tableName");
084    // test
085    TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class,
086      Text.class, job, false, WALInputFormat.class);
087    assertEquals(WALInputFormat.class, job.getInputFormatClass());
088    assertEquals(Import.Importer.class, job.getMapperClass());
089    assertEquals(LongWritable.class, job.getOutputKeyClass());
090    assertEquals(Text.class, job.getOutputValueClass());
091    assertNull(job.getCombinerClass());
092    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
093  }
094
095  @Test
096  public void testInitTableMapperJob2() throws Exception {
097    Configuration configuration = new Configuration();
098    Job job = Job.getInstance(configuration, "tableName");
099    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class,
100      Text.class, Text.class, job, false, WALInputFormat.class);
101    assertEquals(WALInputFormat.class, job.getInputFormatClass());
102    assertEquals(Import.Importer.class, job.getMapperClass());
103    assertEquals(LongWritable.class, job.getOutputKeyClass());
104    assertEquals(Text.class, job.getOutputValueClass());
105    assertNull(job.getCombinerClass());
106    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
107  }
108
109  @Test
110  public void testInitTableMapperJob3() throws Exception {
111    Configuration configuration = new Configuration();
112    Job job = Job.getInstance(configuration, "tableName");
113    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class,
114      Text.class, Text.class, job);
115    assertEquals(TableInputFormat.class, job.getInputFormatClass());
116    assertEquals(Import.Importer.class, job.getMapperClass());
117    assertEquals(LongWritable.class, job.getOutputKeyClass());
118    assertEquals(Text.class, job.getOutputValueClass());
119    assertNull(job.getCombinerClass());
120    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
121  }
122
123  @Test
124  public void testInitTableMapperJob4() throws Exception {
125    Configuration configuration = new Configuration();
126    Job job = Job.getInstance(configuration, "tableName");
127    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class,
128      Text.class, Text.class, job, false);
129    assertEquals(TableInputFormat.class, job.getInputFormatClass());
130    assertEquals(Import.Importer.class, job.getMapperClass());
131    assertEquals(LongWritable.class, job.getOutputKeyClass());
132    assertEquals(Text.class, job.getOutputValueClass());
133    assertNull(job.getCombinerClass());
134    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
135  }
136
137  private static Closeable startSecureMiniCluster(HBaseTestingUtil util, MiniKdc kdc,
138    String principal) throws Exception {
139    Configuration conf = util.getConfiguration();
140
141    SecureTestUtil.enableSecurity(conf);
142    VisibilityTestUtil.enableVisiblityLabels(conf);
143    SecureTestUtil.verifyConfiguration(conf);
144
145    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
146      AccessController.class.getName() + ',' + TokenProvider.class.getName());
147
148    HBaseKerberosUtils.setSecuredConfiguration(conf, principal + '@' + kdc.getRealm(),
149      HTTP_PRINCIPAL + '@' + kdc.getRealm());
150
151    util.startMiniCluster();
152    try {
153      util.waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
154    } catch (Exception e) {
155      util.shutdownMiniCluster();
156      throw e;
157    }
158
159    return util::shutdownMiniCluster;
160  }
161
162  @Test
163  public void testInitCredentialsForCluster1() throws Exception {
164    HBaseTestingUtil util1 = new HBaseTestingUtil();
165    HBaseTestingUtil util2 = new HBaseTestingUtil();
166
167    util1.startMiniCluster();
168    try {
169      util2.startMiniCluster();
170      try {
171        Configuration conf1 = util1.getConfiguration();
172        Job job = Job.getInstance(conf1);
173
174        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
175
176        Credentials credentials = job.getCredentials();
177        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
178        assertTrue(tokens.isEmpty());
179      } finally {
180        util2.shutdownMiniCluster();
181      }
182    } finally {
183      util1.shutdownMiniCluster();
184    }
185  }
186
187  @Test
188  @SuppressWarnings("unchecked")
189  public void testInitCredentialsForCluster2() throws Exception {
190    HBaseTestingUtil util1 = new HBaseTestingUtil();
191    HBaseTestingUtil util2 = new HBaseTestingUtil();
192
193    File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
194    MiniKdc kdc = util1.setupMiniKdc(keytab);
195    try {
196      String username = UserGroupInformation.getLoginUser().getShortUserName();
197      String userPrincipal = username + "/localhost";
198      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
199      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
200
201      try (Closeable ignored1 = startSecureMiniCluster(util1, kdc, userPrincipal);
202        Closeable ignored2 = startSecureMiniCluster(util2, kdc, userPrincipal)) {
203        Configuration conf1 = util1.getConfiguration();
204        Job job = Job.getInstance(conf1);
205
206        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
207
208        Credentials credentials = job.getCredentials();
209        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
210        assertEquals(1, tokens.size());
211
212        String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
213        Token<AuthenticationTokenIdentifier> tokenForCluster =
214          (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
215        assertEquals(userPrincipal + '@' + kdc.getRealm(),
216          tokenForCluster.decodeIdentifier().getUsername());
217      }
218    } finally {
219      kdc.stop();
220    }
221  }
222
223  @Test
224  public void testInitCredentialsForCluster3() throws Exception {
225    HBaseTestingUtil util1 = new HBaseTestingUtil();
226
227    File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
228    MiniKdc kdc = util1.setupMiniKdc(keytab);
229    try {
230      String username = UserGroupInformation.getLoginUser().getShortUserName();
231      String userPrincipal = username + "/localhost";
232      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
233      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
234
235      try (Closeable ignored1 = startSecureMiniCluster(util1, kdc, userPrincipal)) {
236        HBaseTestingUtil util2 = new HBaseTestingUtil();
237        // Assume util2 is insecure cluster
238        // Do not start util2 because cannot boot secured mini cluster and insecure mini cluster at
239        // once
240
241        Configuration conf1 = util1.getConfiguration();
242        Job job = Job.getInstance(conf1);
243
244        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
245
246        Credentials credentials = job.getCredentials();
247        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
248        assertTrue(tokens.isEmpty());
249      }
250    } finally {
251      kdc.stop();
252    }
253  }
254
255  @Test
256  @SuppressWarnings("unchecked")
257  public void testInitCredentialsForCluster4() throws Exception {
258    HBaseTestingUtil util1 = new HBaseTestingUtil();
259    // Assume util1 is insecure cluster
260    // Do not start util1 because cannot boot secured mini cluster and insecure mini cluster at once
261
262    HBaseTestingUtil util2 = new HBaseTestingUtil();
263    File keytab = new File(util2.getDataTestDir("keytab").toUri().getPath());
264    MiniKdc kdc = util2.setupMiniKdc(keytab);
265    try {
266      String username = UserGroupInformation.getLoginUser().getShortUserName();
267      String userPrincipal = username + "/localhost";
268      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
269      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
270
271      try (Closeable ignored2 = startSecureMiniCluster(util2, kdc, userPrincipal)) {
272        Configuration conf1 = util1.getConfiguration();
273        Job job = Job.getInstance(conf1);
274
275        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
276
277        Credentials credentials = job.getCredentials();
278        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
279        assertEquals(1, tokens.size());
280
281        String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
282        Token<AuthenticationTokenIdentifier> tokenForCluster =
283          (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
284        assertEquals(userPrincipal + '@' + kdc.getRealm(),
285          tokenForCluster.decodeIdentifier().getUsername());
286      }
287    } finally {
288      kdc.stop();
289    }
290  }
291
292  @Test
293  @SuppressWarnings("unchecked")
294  public void testInitCredentialsForClusterUri() throws Exception {
295    HBaseTestingUtil util1 = new HBaseTestingUtil();
296    HBaseTestingUtil util2 = new HBaseTestingUtil();
297
298    File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
299    MiniKdc kdc = util1.setupMiniKdc(keytab);
300    try {
301      String username = UserGroupInformation.getLoginUser().getShortUserName();
302      String userPrincipal = username + "/localhost";
303      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
304      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
305
306      try (Closeable ignored1 = startSecureMiniCluster(util1, kdc, userPrincipal);
307        Closeable ignored2 = startSecureMiniCluster(util2, kdc, userPrincipal)) {
308        Configuration conf1 = util1.getConfiguration();
309        Job job = Job.getInstance(conf1);
310
311        // use Configuration from util1 and URI from util2, to make sure that we use the URI instead
312        // of rely on the Configuration
313        TableMapReduceUtil.initCredentialsForCluster(job, util1.getConfiguration(),
314          new URI(util2.getRpcConnnectionURI()));
315
316        Credentials credentials = job.getCredentials();
317        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
318        assertEquals(1, tokens.size());
319
320        String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
321        Token<AuthenticationTokenIdentifier> tokenForCluster =
322          (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
323        assertEquals(userPrincipal + '@' + kdc.getRealm(),
324          tokenForCluster.decodeIdentifier().getUsername());
325      }
326    } finally {
327      kdc.stop();
328    }
329  }
330}