001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; 021 022import java.io.IOException; 023import java.util.Collections; 024import java.util.List; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 028import org.apache.hadoop.hbase.util.Pair; 029import org.apache.hadoop.security.token.Token; 030import org.apache.yetus.audience.InterfaceAudience; 031 032import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 033import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 035import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 042 043/** 044 * Client proxy for SecureBulkLoadProtocol 045 */ 046@InterfaceAudience.Private 047public class SecureBulkLoadClient { 048 private Table table; 049 private final RpcControllerFactory rpcControllerFactory; 050 051 public SecureBulkLoadClient(final Configuration conf, Table table) { 052 this.table = table; 053 this.rpcControllerFactory = new RpcControllerFactory(conf); 054 } 055 056 public String prepareBulkLoad(final Connection conn) throws IOException { 057 try { 058 ClientServiceCallable<String> callable = 059 new ClientServiceCallable<String>(conn, table.getName(), HConstants.EMPTY_START_ROW, 060 this.rpcControllerFactory.newController(), PRIORITY_UNSET, Collections.emptyMap()) { 061 @Override 062 protected String rpcCall() throws Exception { 063 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 064 RegionSpecifier region = 065 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); 066 PrepareBulkLoadRequest request = PrepareBulkLoadRequest.newBuilder() 067 .setTableName(ProtobufUtil.toProtoTableName(table.getName())).setRegion(region) 068 .build(); 069 PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request); 070 return response.getBulkToken(); 071 } 072 }; 073 return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null) 074 .<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE); 075 } catch (Throwable throwable) { 076 throw new IOException(throwable); 077 } 078 } 079 080 public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException { 081 try { 082 ClientServiceCallable<Void> callable = 083 new ClientServiceCallable<Void>(conn, table.getName(), HConstants.EMPTY_START_ROW, 084 this.rpcControllerFactory.newController(), PRIORITY_UNSET, Collections.emptyMap()) { 085 @Override 086 protected Void rpcCall() throws Exception { 087 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 088 RegionSpecifier region = 089 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); 090 CleanupBulkLoadRequest request = 091 CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bulkToken).build(); 092 getStub().cleanupBulkLoad(null, request); 093 return null; 094 } 095 }; 096 RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null).<Void> newCaller() 097 .callWithRetries(callable, Integer.MAX_VALUE); 098 } catch (Throwable throwable) { 099 throw new IOException(throwable); 100 } 101 } 102 103 /** 104 * Securely bulk load a list of HFiles using client protocol. 105 * @return true if all are loaded 106 */ 107 public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, 108 final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum, 109 final Token<?> userToken, final String bulkToken) throws IOException { 110 return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken, 111 false, null, true); 112 } 113 114 /** 115 * Securely bulk load a list of HFiles using client protocol. 116 * @return true if all are loaded 117 */ 118 public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, 119 final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum, 120 final Token<?> userToken, final String bulkToken, boolean copyFiles) throws IOException { 121 return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken, 122 copyFiles, null, true); 123 } 124 125 public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, 126 final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum, 127 final Token<?> userToken, final String bulkToken, boolean copyFiles, List<String> clusterIds, 128 boolean replicate) throws IOException { 129 BulkLoadHFileRequest request = RequestConverter.buildBulkLoadHFileRequest(familyPaths, 130 regionName, assignSeqNum, userToken, bulkToken, copyFiles, clusterIds, replicate); 131 132 try { 133 BulkLoadHFileResponse response = client.bulkLoadHFile(null, request); 134 return response.getLoaded(); 135 } catch (Exception se) { 136 throw ProtobufUtil.handleRemoteException(se); 137 } 138 } 139}