001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import com.google.protobuf.RpcChannel; 021import java.util.Collections; 022import java.util.Map; 023import java.util.concurrent.CompletableFuture; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.function.Function; 026import org.apache.hadoop.hbase.ServerName; 027import org.apache.hadoop.hbase.util.FutureUtils; 028import org.apache.yetus.audience.InterfaceAudience; 029 030/** 031 * Additional Asynchronous Admin capabilities for clients. 032 */ 033@InterfaceAudience.Public 034public final class AsyncAdminClientUtils { 035 036 private AsyncAdminClientUtils() { 037 } 038 039 /** 040 * Execute the given coprocessor call on all region servers. 041 * <p> 042 * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a 043 * one line lambda expression, like: 044 * 045 * <pre> 046 * channel -> xxxService.newStub(channel) 047 * </pre> 048 * 049 * @param asyncAdmin the asynchronous administrative API for HBase. 050 * @param stubMaker a delegation to the actual {@code newStub} call. 051 * @param callable a delegation to the actual protobuf rpc call. See the comment of 052 * {@link ServiceCaller} for more details. 053 * @param <S> the type of the asynchronous stub 054 * @param <R> the type of the return value 055 * @return Map of each region server to its result of the protobuf rpc call, wrapped by a 056 * {@link CompletableFuture}. 057 * @see ServiceCaller 058 */ 059 public static <S, R> CompletableFuture<Map<ServerName, Object>> 060 coprocessorServiceOnAllRegionServers(AsyncAdmin asyncAdmin, Function<RpcChannel, S> stubMaker, 061 ServiceCaller<S, R> callable) { 062 CompletableFuture<Map<ServerName, Object>> future = new CompletableFuture<>(); 063 FutureUtils.addListener(asyncAdmin.getRegionServers(), (regionServers, error) -> { 064 if (error != null) { 065 future.completeExceptionally(error); 066 return; 067 } 068 Map<ServerName, Object> resultMap = new ConcurrentHashMap<>(); 069 for (ServerName regionServer : regionServers) { 070 FutureUtils.addListener(asyncAdmin.coprocessorService(stubMaker, callable, regionServer), 071 (server, err) -> { 072 if (err != null) { 073 resultMap.put(regionServer, err); 074 } else { 075 resultMap.put(regionServer, server); 076 } 077 if (resultMap.size() == regionServers.size()) { 078 future.complete(Collections.unmodifiableMap(resultMap)); 079 } 080 }); 081 } 082 }); 083 return future; 084 } 085}