001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.util.Collections; 021import java.util.Map; 022import java.util.concurrent.CompletableFuture; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.function.Function; 025import org.apache.hadoop.hbase.ServerName; 026import org.apache.hadoop.hbase.util.FutureUtils; 027import org.apache.yetus.audience.InterfaceAudience; 028 029import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 030 031/** 032 * Additional Asynchronous Admin capabilities for clients. 033 */ 034@InterfaceAudience.Public 035public final class AsyncAdminClientUtils { 036 037 private AsyncAdminClientUtils() { 038 } 039 040 /** 041 * Execute the given coprocessor call on all region servers. 042 * <p> 043 * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a 044 * one line lambda expression, like: 045 * 046 * <pre> 047 * channel -> xxxService.newStub(channel) 048 * </pre> 049 * 050 * @param asyncAdmin the asynchronous administrative API for HBase. 051 * @param stubMaker a delegation to the actual {@code newStub} call. 052 * @param callable a delegation to the actual protobuf rpc call. See the comment of 053 * {@link ServiceCaller} for more details. 054 * @param <S> the type of the asynchronous stub 055 * @param <R> the type of the return value 056 * @return Map of each region server to its result of the protobuf rpc call, wrapped by a 057 * {@link CompletableFuture}. 058 * @see ServiceCaller 059 */ 060 public static <S, R> CompletableFuture<Map<ServerName, Object>> 061 coprocessorServiceOnAllRegionServers(AsyncAdmin asyncAdmin, Function<RpcChannel, S> stubMaker, 062 ServiceCaller<S, R> callable) { 063 CompletableFuture<Map<ServerName, Object>> future = new CompletableFuture<>(); 064 FutureUtils.addListener(asyncAdmin.getRegionServers(), (regionServers, error) -> { 065 if (error != null) { 066 future.completeExceptionally(error); 067 return; 068 } 069 Map<ServerName, Object> resultMap = new ConcurrentHashMap<>(); 070 for (ServerName regionServer : regionServers) { 071 FutureUtils.addListener(asyncAdmin.coprocessorService(stubMaker, callable, regionServer), 072 (server, err) -> { 073 if (err != null) { 074 resultMap.put(regionServer, err); 075 } else { 076 resultMap.put(regionServer, server); 077 } 078 if (resultMap.size() == regionServers.size()) { 079 future.complete(Collections.unmodifiableMap(resultMap)); 080 } 081 }); 082 } 083 }); 084 return future; 085 } 086}