001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import com.google.protobuf.RpcChannel;
021import java.util.Collections;
022import java.util.Map;
023import java.util.concurrent.CompletableFuture;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.function.Function;
026import org.apache.hadoop.hbase.ServerName;
027import org.apache.hadoop.hbase.util.FutureUtils;
028import org.apache.yetus.audience.InterfaceAudience;
029
030/**
031 * Additional Asynchronous Admin capabilities for clients.
032 */
033@InterfaceAudience.Public
034public final class AsyncAdminClientUtils {
035
036  private AsyncAdminClientUtils() {
037  }
038
039  /**
040   * Execute the given coprocessor call on all region servers.
041   * <p>
042   * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
043   * one line lambda expression, like:
044   *
045   * <pre>
046   * channel -&gt; xxxService.newStub(channel)
047   * </pre>
048   *
049   * @param asyncAdmin the asynchronous administrative API for HBase.
050   * @param stubMaker  a delegation to the actual {@code newStub} call.
051   * @param callable   a delegation to the actual protobuf rpc call. See the comment of
052   *                   {@link ServiceCaller} for more details.
053   * @param <S>        the type of the asynchronous stub
054   * @param <R>        the type of the return value
055   * @return Map of each region server to its result of the protobuf rpc call, wrapped by a
056   *         {@link CompletableFuture}.
057   * @see ServiceCaller
058   */
059  public static <S, R> CompletableFuture<Map<ServerName, Object>>
060    coprocessorServiceOnAllRegionServers(AsyncAdmin asyncAdmin, Function<RpcChannel, S> stubMaker,
061      ServiceCaller<S, R> callable) {
062    CompletableFuture<Map<ServerName, Object>> future = new CompletableFuture<>();
063    FutureUtils.addListener(asyncAdmin.getRegionServers(), (regionServers, error) -> {
064      if (error != null) {
065        future.completeExceptionally(error);
066        return;
067      }
068      Map<ServerName, Object> resultMap = new ConcurrentHashMap<>();
069      for (ServerName regionServer : regionServers) {
070        FutureUtils.addListener(asyncAdmin.coprocessorService(stubMaker, callable, regionServer),
071          (server, err) -> {
072            if (err != null) {
073              resultMap.put(regionServer, err);
074            } else {
075              resultMap.put(regionServer, server);
076            }
077            if (resultMap.size() == regionServers.size()) {
078              future.complete(Collections.unmodifiableMap(resultMap));
079            }
080          });
081      }
082    });
083    return future;
084  }
085}