001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.util.Collections;
021import java.util.Map;
022import java.util.concurrent.CompletableFuture;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.function.Function;
025import org.apache.hadoop.hbase.ServerName;
026import org.apache.hadoop.hbase.util.FutureUtils;
027import org.apache.yetus.audience.InterfaceAudience;
028
029import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
030
031/**
032 * Additional Asynchronous Admin capabilities for clients.
033 */
034@InterfaceAudience.Public
035public final class AsyncAdminClientUtils {
036
037  private AsyncAdminClientUtils() {
038  }
039
040  /**
041   * Execute the given coprocessor call on all region servers.
042   * <p>
043   * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
044   * one line lambda expression, like:
045   *
046   * <pre>
047   * channel -&gt; xxxService.newStub(channel)
048   * </pre>
049   *
050   * @param asyncAdmin the asynchronous administrative API for HBase.
051   * @param stubMaker  a delegation to the actual {@code newStub} call.
052   * @param callable   a delegation to the actual protobuf rpc call. See the comment of
053   *                   {@link ServiceCaller} for more details.
054   * @param <S>        the type of the asynchronous stub
055   * @param <R>        the type of the return value
056   * @return Map of each region server to its result of the protobuf rpc call, wrapped by a
057   *         {@link CompletableFuture}.
058   * @see ServiceCaller
059   */
060  public static <S, R> CompletableFuture<Map<ServerName, Object>>
061    coprocessorServiceOnAllRegionServers(AsyncAdmin asyncAdmin, Function<RpcChannel, S> stubMaker,
062      ServiceCaller<S, R> callable) {
063    CompletableFuture<Map<ServerName, Object>> future = new CompletableFuture<>();
064    FutureUtils.addListener(asyncAdmin.getRegionServers(), (regionServers, error) -> {
065      if (error != null) {
066        future.completeExceptionally(error);
067        return;
068      }
069      Map<ServerName, Object> resultMap = new ConcurrentHashMap<>();
070      for (ServerName regionServer : regionServers) {
071        FutureUtils.addListener(asyncAdmin.coprocessorService(stubMaker, callable, regionServer),
072          (server, err) -> {
073            if (err != null) {
074              resultMap.put(regionServer, err);
075            } else {
076              resultMap.put(regionServer, server);
077            }
078            if (resultMap.size() == regionServers.size()) {
079              future.complete(Collections.unmodifiableMap(resultMap));
080            }
081          });
082      }
083    });
084    return future;
085  }
086}