001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
021
022import java.io.IOException;
023import java.util.Collections;
024import java.util.List;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
028import org.apache.hadoop.hbase.util.Pair;
029import org.apache.hadoop.security.token.Token;
030import org.apache.yetus.audience.InterfaceAudience;
031
032import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
033import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
034import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
042
043/**
044 * Client proxy for SecureBulkLoadProtocol
045 */
046@InterfaceAudience.Private
047public class SecureBulkLoadClient {
048  private Table table;
049  private final RpcControllerFactory rpcControllerFactory;
050
051  public SecureBulkLoadClient(final Configuration conf, Table table) {
052    this.table = table;
053    this.rpcControllerFactory = new RpcControllerFactory(conf);
054  }
055
056  public String prepareBulkLoad(final Connection conn) throws IOException {
057    try {
058      ClientServiceCallable<String> callable =
059        new ClientServiceCallable<String>(conn, table.getName(), HConstants.EMPTY_START_ROW,
060          this.rpcControllerFactory.newController(), PRIORITY_UNSET, Collections.emptyMap()) {
061          @Override
062          protected String rpcCall() throws Exception {
063            byte[] regionName = getLocation().getRegionInfo().getRegionName();
064            RegionSpecifier region =
065              RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
066            PrepareBulkLoadRequest request = PrepareBulkLoadRequest.newBuilder()
067              .setTableName(ProtobufUtil.toProtoTableName(table.getName())).setRegion(region)
068              .build();
069            PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request);
070            return response.getBulkToken();
071          }
072        };
073      return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
074        .<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
075    } catch (Throwable throwable) {
076      throw new IOException(throwable);
077    }
078  }
079
080  public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException {
081    try {
082      ClientServiceCallable<Void> callable =
083        new ClientServiceCallable<Void>(conn, table.getName(), HConstants.EMPTY_START_ROW,
084          this.rpcControllerFactory.newController(), PRIORITY_UNSET, Collections.emptyMap()) {
085          @Override
086          protected Void rpcCall() throws Exception {
087            byte[] regionName = getLocation().getRegionInfo().getRegionName();
088            RegionSpecifier region =
089              RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
090            CleanupBulkLoadRequest request =
091              CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bulkToken).build();
092            getStub().cleanupBulkLoad(null, request);
093            return null;
094          }
095        };
096      RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null).<Void> newCaller()
097        .callWithRetries(callable, Integer.MAX_VALUE);
098    } catch (Throwable throwable) {
099      throw new IOException(throwable);
100    }
101  }
102
103  /**
104   * Securely bulk load a list of HFiles using client protocol.
105   * @return true if all are loaded
106   */
107  public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
108    final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
109    final Token<?> userToken, final String bulkToken) throws IOException {
110    return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken,
111      false, null, true);
112  }
113
114  /**
115   * Securely bulk load a list of HFiles using client protocol.
116   * @return true if all are loaded
117   */
118  public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
119    final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
120    final Token<?> userToken, final String bulkToken, boolean copyFiles) throws IOException {
121    return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken,
122      copyFiles, null, true);
123  }
124
125  public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
126    final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
127    final Token<?> userToken, final String bulkToken, boolean copyFiles, List<String> clusterIds,
128    boolean replicate) throws IOException {
129    BulkLoadHFileRequest request = RequestConverter.buildBulkLoadHFileRequest(familyPaths,
130      regionName, assignSeqNum, userToken, bulkToken, copyFiles, clusterIds, replicate);
131
132    try {
133      BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
134      return response.getLoaded();
135    } catch (Exception se) {
136      throw ProtobufUtil.handleRemoteException(se);
137    }
138  }
139}