001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import com.google.protobuf.Message;
026import com.google.protobuf.RpcChannel;
027import edu.umd.cs.findbugs.annotations.Nullable;
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.EnumSet;
033import java.util.HashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Optional;
037import java.util.Set;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.ConcurrentLinkedQueue;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.atomic.AtomicReference;
043import java.util.function.BiConsumer;
044import java.util.function.Consumer;
045import java.util.function.Function;
046import java.util.function.Supplier;
047import java.util.regex.Pattern;
048import java.util.stream.Collectors;
049import java.util.stream.Stream;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
052import org.apache.hadoop.hbase.CacheEvictionStats;
053import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
054import org.apache.hadoop.hbase.ClusterMetrics;
055import org.apache.hadoop.hbase.ClusterMetrics.Option;
056import org.apache.hadoop.hbase.ClusterMetricsBuilder;
057import org.apache.hadoop.hbase.DoNotRetryIOException;
058import org.apache.hadoop.hbase.HConstants;
059import org.apache.hadoop.hbase.HRegionLocation;
060import org.apache.hadoop.hbase.MetaTableAccessor;
061import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
062import org.apache.hadoop.hbase.NamespaceDescriptor;
063import org.apache.hadoop.hbase.RegionLocations;
064import org.apache.hadoop.hbase.RegionMetrics;
065import org.apache.hadoop.hbase.RegionMetricsBuilder;
066import org.apache.hadoop.hbase.ServerName;
067import org.apache.hadoop.hbase.TableExistsException;
068import org.apache.hadoop.hbase.TableName;
069import org.apache.hadoop.hbase.TableNotDisabledException;
070import org.apache.hadoop.hbase.TableNotEnabledException;
071import org.apache.hadoop.hbase.TableNotFoundException;
072import org.apache.hadoop.hbase.UnknownRegionException;
073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
074import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
075import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
076import org.apache.hadoop.hbase.client.Scan.ReadType;
077import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
078import org.apache.hadoop.hbase.client.replication.TableCFs;
079import org.apache.hadoop.hbase.client.security.SecurityCapability;
080import org.apache.hadoop.hbase.exceptions.DeserializationException;
081import org.apache.hadoop.hbase.ipc.HBaseRpcController;
082import org.apache.hadoop.hbase.quotas.QuotaFilter;
083import org.apache.hadoop.hbase.quotas.QuotaSettings;
084import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
085import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
086import org.apache.hadoop.hbase.replication.ReplicationException;
087import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
088import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
089import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
090import org.apache.hadoop.hbase.security.access.Permission;
091import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
092import org.apache.hadoop.hbase.security.access.UserPermission;
093import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
094import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
095import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
096import org.apache.hadoop.hbase.util.Bytes;
097import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
098import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
099import org.apache.hadoop.hbase.util.Strings;
100import org.apache.yetus.audience.InterfaceAudience;
101import org.slf4j.Logger;
102import org.slf4j.LoggerFactory;
103
104import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
105import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
106import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
107import org.apache.hbase.thirdparty.io.netty.util.Timeout;
108import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
109import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
110
111import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
112import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
305import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
306import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
307import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
308import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
309import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
310import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
311import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
312import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
313import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
314import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
315import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
316import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
317import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
318import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
319import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
320import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
321import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
322import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
323import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
324
325/**
326 * The implementation of AsyncAdmin.
327 * <p>
328 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
329 * be finished inside the rpc framework thread, which means that the callbacks registered to the
330 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
331 * this class should not try to do time consuming tasks in the callbacks.
332 * @since 2.0.0
333 * @see AsyncHBaseAdmin
334 * @see AsyncConnection#getAdmin()
335 * @see AsyncConnection#getAdminBuilder()
336 */
337@InterfaceAudience.Private
338class RawAsyncHBaseAdmin implements AsyncAdmin {
339  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
340
341  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
342
343  private final AsyncConnectionImpl connection;
344
345  private final HashedWheelTimer retryTimer;
346
347  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
348
349  private final long rpcTimeoutNs;
350
351  private final long operationTimeoutNs;
352
353  private final long pauseNs;
354
355  private final long pauseNsForServerOverloaded;
356
357  private final int maxAttempts;
358
359  private final int startLogErrorsCnt;
360
361  private final NonceGenerator ng;
362
363  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
364    AsyncAdminBuilderBase builder) {
365    this.connection = connection;
366    this.retryTimer = retryTimer;
367    this.metaTable = connection.getTable(META_TABLE_NAME);
368    this.rpcTimeoutNs = builder.rpcTimeoutNs;
369    this.operationTimeoutNs = builder.operationTimeoutNs;
370    this.pauseNs = builder.pauseNs;
371    if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
372      LOG.warn(
373        "Configured value of pauseNsForServerOverloaded is {} ms, which is less than"
374          + " the normal pause value {} ms, use the greater one instead",
375        TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
376        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
377      this.pauseNsForServerOverloaded = builder.pauseNs;
378    } else {
379      this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
380    }
381    this.maxAttempts = builder.maxAttempts;
382    this.startLogErrorsCnt = builder.startLogErrorsCnt;
383    this.ng = connection.getNonceGenerator();
384  }
385
386  private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
387    return this.connection.callerFactory.<T> masterRequest()
388      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
389      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
390      .pause(pauseNs, TimeUnit.NANOSECONDS)
391      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
392      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
393  }
394
395  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
396    return this.connection.callerFactory.<T> adminRequest()
397      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
398      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
399      .pause(pauseNs, TimeUnit.NANOSECONDS)
400      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
401      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
402  }
403
404  @FunctionalInterface
405  private interface MasterRpcCall<RESP, REQ> {
406    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
407      RpcCallback<RESP> done);
408  }
409
410  @FunctionalInterface
411  private interface AdminRpcCall<RESP, REQ> {
412    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
413      RpcCallback<RESP> done);
414  }
415
416  @FunctionalInterface
417  private interface Converter<D, S> {
418    D convert(S src) throws IOException;
419  }
420
421  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
422    MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
423    Converter<RESP, PRESP> respConverter) {
424    CompletableFuture<RESP> future = new CompletableFuture<>();
425    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
426
427      @Override
428      public void run(PRESP resp) {
429        if (controller.failed()) {
430          future.completeExceptionally(controller.getFailed());
431        } else {
432          try {
433            future.complete(respConverter.convert(resp));
434          } catch (IOException e) {
435            future.completeExceptionally(e);
436          }
437        }
438      }
439    });
440    return future;
441  }
442
443  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
444    AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
445    Converter<RESP, PRESP> respConverter) {
446    CompletableFuture<RESP> future = new CompletableFuture<>();
447    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
448
449      @Override
450      public void run(PRESP resp) {
451        if (controller.failed()) {
452          future.completeExceptionally(controller.getFailed());
453        } else {
454          try {
455            future.complete(respConverter.convert(resp));
456          } catch (IOException e) {
457            future.completeExceptionally(e);
458          }
459        }
460      }
461    });
462    return future;
463  }
464
465  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
466    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
467    ProcedureBiConsumer consumer) {
468    return procedureCall(b -> {
469    }, preq, rpcCall, respConverter, consumer);
470  }
471
472  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
473    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
474    ProcedureBiConsumer consumer) {
475    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
476  }
477
478  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
479    Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
480    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
481    ProcedureBiConsumer consumer) {
482    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
483      stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
484    prioritySetter.accept(builder);
485    CompletableFuture<Long> procFuture = builder.call();
486    CompletableFuture<Void> future = waitProcedureResult(procFuture);
487    addListener(future, consumer);
488    return future;
489  }
490
491  @Override
492  public CompletableFuture<Boolean> tableExists(TableName tableName) {
493    if (TableName.isMetaTableName(tableName)) {
494      return CompletableFuture.completedFuture(true);
495    }
496    return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
497  }
498
499  @Override
500  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
501    return getTableDescriptors(
502      RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables));
503  }
504
505  /**
506   * {@link #listTableDescriptors(boolean)}
507   */
508  @Override
509  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
510    boolean includeSysTables) {
511    Preconditions.checkNotNull(pattern,
512      "pattern is null. If you don't specify a pattern, use listTables(boolean) instead");
513    return getTableDescriptors(
514      RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables));
515  }
516
517  @Override
518  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
519    Preconditions.checkNotNull(tableNames,
520      "tableNames is null. If you don't specify tableNames, " + "use listTables(boolean) instead");
521    if (tableNames.isEmpty()) {
522      return CompletableFuture.completedFuture(Collections.emptyList());
523    }
524    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
525  }
526
527  private CompletableFuture<List<TableDescriptor>>
528    getTableDescriptors(GetTableDescriptorsRequest request) {
529    return this.<List<TableDescriptor>> newMasterCaller()
530      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
531        List<TableDescriptor>> call(controller, stub, request,
532          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
533          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
534      .call();
535  }
536
537  @Override
538  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
539    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
540  }
541
542  @Override
543  public CompletableFuture<List<TableName>> listTableNames(Pattern pattern,
544    boolean includeSysTables) {
545    Preconditions.checkNotNull(pattern,
546      "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
547    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
548  }
549
550  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
551    return this.<List<TableName>> newMasterCaller()
552      .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse,
553        List<TableName>> call(controller, stub, request,
554          (s, c, req, done) -> s.getTableNames(c, req, done),
555          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
556      .call();
557  }
558
559  @Override
560  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
561    return this.<List<TableDescriptor>> newMasterCaller()
562      .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest,
563        ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub,
564          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
565          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
566          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
567      .call();
568  }
569
570  @Override
571  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) {
572    return this.<List<TableDescriptor>> newMasterCaller()
573      .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest,
574        ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub,
575          ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
576          (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done),
577          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
578      .call();
579  }
580
581  @Override
582  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
583    return this.<List<TableName>> newMasterCaller()
584      .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest,
585        ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub,
586          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
587          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
588          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
589      .call();
590  }
591
592  @Override
593  public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) {
594    return this.<List<TableName>> newMasterCaller()
595      .action((controller, stub) -> this.<ListTableNamesByStateRequest,
596        ListTableNamesByStateResponse, List<TableName>> call(controller, stub,
597          ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
598          (s, c, req, done) -> s.listTableNamesByState(c, req, done),
599          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
600      .call();
601  }
602
603  @Override
604  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
605    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
606    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
607      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
608        List<TableSchema>> call(controller, stub,
609          RequestConverter.buildGetTableDescriptorsRequest(tableName),
610          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
611          (resp) -> resp.getTableSchemaList()))
612      .call(), (tableSchemas, error) -> {
613        if (error != null) {
614          future.completeExceptionally(error);
615          return;
616        }
617        if (!tableSchemas.isEmpty()) {
618          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
619        } else {
620          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
621        }
622      });
623    return future;
624  }
625
626  @Override
627  public CompletableFuture<Void> createTable(TableDescriptor desc) {
628    return createTable(desc.getTableName(),
629      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
630  }
631
632  @Override
633  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
634    int numRegions) {
635    try {
636      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
637    } catch (IllegalArgumentException e) {
638      return failedFuture(e);
639    }
640  }
641
642  @Override
643  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
644    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
645      + " use createTable(TableDescriptor) instead");
646    try {
647      verifySplitKeys(splitKeys);
648      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
649        splitKeys, ng.getNonceGroup(), ng.newNonce()));
650    } catch (IllegalArgumentException e) {
651      return failedFuture(e);
652    }
653  }
654
655  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
656    Preconditions.checkNotNull(tableName, "table name is null");
657    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
658      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
659      new CreateTableProcedureBiConsumer(tableName));
660  }
661
662  @Override
663  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
664    return modifyTable(desc, true);
665  }
666
667  public CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenRegions) {
668    // TODO fill the request with reopenRegions
669    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
670      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
671        ng.newNonce(), reopenRegions),
672      (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(),
673      new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
674  }
675
676  @Override
677  public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) {
678    return this.<ModifyTableStoreFileTrackerRequest,
679      ModifyTableStoreFileTrackerResponse> procedureCall(tableName,
680        RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT,
681          ng.getNonceGroup(), ng.newNonce()),
682        (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done),
683        (resp) -> resp.getProcId(),
684        new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName));
685  }
686
687  @Override
688  public CompletableFuture<Void> deleteTable(TableName tableName) {
689    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
690      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
691      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
692      new DeleteTableProcedureBiConsumer(tableName));
693  }
694
695  @Override
696  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
697    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
698      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
699        ng.newNonce()),
700      (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(),
701      new TruncateTableProcedureBiConsumer(tableName));
702  }
703
704  @Override
705  public CompletableFuture<Void> enableTable(TableName tableName) {
706    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
707      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
708      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
709      new EnableTableProcedureBiConsumer(tableName));
710  }
711
712  @Override
713  public CompletableFuture<Void> disableTable(TableName tableName) {
714    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
715      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
716      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
717      new DisableTableProcedureBiConsumer(tableName));
718  }
719
720  /**
721   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using
722   * passed parameters. Sets error or boolean result ('true' if table matches the passed-in
723   * targetState).
724   */
725  private static CompletableFuture<Boolean> completeCheckTableState(
726    CompletableFuture<Boolean> future, TableState tableState, Throwable error,
727    TableState.State targetState, TableName tableName) {
728    if (error != null) {
729      future.completeExceptionally(error);
730    } else {
731      if (tableState != null) {
732        future.complete(tableState.inStates(targetState));
733      } else {
734        future.completeExceptionally(new TableNotFoundException(tableName));
735      }
736    }
737    return future;
738  }
739
740  @Override
741  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
742    if (TableName.isMetaTableName(tableName)) {
743      return CompletableFuture.completedFuture(true);
744    }
745    CompletableFuture<Boolean> future = new CompletableFuture<>();
746    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> {
747      completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
748        TableState.State.ENABLED, tableName);
749    });
750    return future;
751  }
752
753  @Override
754  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
755    if (TableName.isMetaTableName(tableName)) {
756      return CompletableFuture.completedFuture(false);
757    }
758    CompletableFuture<Boolean> future = new CompletableFuture<>();
759    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> {
760      completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
761        TableState.State.DISABLED, tableName);
762    });
763    return future;
764  }
765
766  @Override
767  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
768    return isTableAvailable(tableName, Optional.empty());
769  }
770
771  @Override
772  public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) {
773    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
774      + " use isTableAvailable(TableName) instead");
775    return isTableAvailable(tableName, Optional.of(splitKeys));
776  }
777
778  private CompletableFuture<Boolean> isTableAvailable(TableName tableName,
779    Optional<byte[][]> splitKeys) {
780    if (TableName.isMetaTableName(tableName)) {
781      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
782        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
783    }
784    CompletableFuture<Boolean> future = new CompletableFuture<>();
785    addListener(isTableEnabled(tableName), (enabled, error) -> {
786      if (error != null) {
787        if (error instanceof TableNotFoundException) {
788          future.complete(false);
789        } else {
790          future.completeExceptionally(error);
791        }
792        return;
793      }
794      if (!enabled) {
795        future.complete(false);
796      } else {
797        addListener(AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
798          (locations, error1) -> {
799            if (error1 != null) {
800              future.completeExceptionally(error1);
801              return;
802            }
803            List<HRegionLocation> notDeployedRegions = locations.stream()
804              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
805            if (notDeployedRegions.size() > 0) {
806              if (LOG.isDebugEnabled()) {
807                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
808              }
809              future.complete(false);
810              return;
811            }
812
813            Optional<Boolean> available =
814              splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys));
815            future.complete(available.orElse(true));
816          });
817      }
818    });
819    return future;
820  }
821
822  private boolean compareRegionsWithSplitKeys(List<HRegionLocation> locations, byte[][] splitKeys) {
823    int regionCount = 0;
824    for (HRegionLocation location : locations) {
825      RegionInfo info = location.getRegion();
826      if (Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
827        regionCount++;
828        continue;
829      }
830      for (byte[] splitKey : splitKeys) {
831        // Just check if the splitkey is available
832        if (Bytes.equals(info.getStartKey(), splitKey)) {
833          regionCount++;
834          break;
835        }
836      }
837    }
838    return regionCount == splitKeys.length + 1;
839  }
840
841  @Override
842  public CompletableFuture<Void> addColumnFamily(TableName tableName,
843    ColumnFamilyDescriptor columnFamily) {
844    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
845      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
846        ng.newNonce()),
847      (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
848      new AddColumnFamilyProcedureBiConsumer(tableName));
849  }
850
851  @Override
852  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
853    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
854      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
855        ng.newNonce()),
856      (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(),
857      new DeleteColumnFamilyProcedureBiConsumer(tableName));
858  }
859
860  @Override
861  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
862    ColumnFamilyDescriptor columnFamily) {
863    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
864      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
865        ng.newNonce()),
866      (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(),
867      new ModifyColumnFamilyProcedureBiConsumer(tableName));
868  }
869
870  @Override
871  public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName,
872    byte[] family, String dstSFT) {
873    return this.<ModifyColumnStoreFileTrackerRequest,
874      ModifyColumnStoreFileTrackerResponse> procedureCall(tableName,
875        RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT,
876          ng.getNonceGroup(), ng.newNonce()),
877        (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done),
878        (resp) -> resp.getProcId(),
879        new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName));
880  }
881
882  @Override
883  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
884    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
885      RequestConverter.buildCreateNamespaceRequest(descriptor),
886      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
887      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
888  }
889
890  @Override
891  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
892    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
893      RequestConverter.buildModifyNamespaceRequest(descriptor),
894      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
895      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
896  }
897
898  @Override
899  public CompletableFuture<Void> deleteNamespace(String name) {
900    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
901      RequestConverter.buildDeleteNamespaceRequest(name),
902      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
903      new DeleteNamespaceProcedureBiConsumer(name));
904  }
905
906  @Override
907  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
908    return this.<NamespaceDescriptor> newMasterCaller()
909      .action((controller, stub) -> this.<GetNamespaceDescriptorRequest,
910        GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub,
911          RequestConverter.buildGetNamespaceDescriptorRequest(name),
912          (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done),
913          (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor())))
914      .call();
915  }
916
917  @Override
918  public CompletableFuture<List<String>> listNamespaces() {
919    return this.<List<String>> newMasterCaller()
920      .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse,
921        List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(),
922          (s, c, req, done) -> s.listNamespaces(c, req, done),
923          (resp) -> resp.getNamespaceNameList()))
924      .call();
925  }
926
927  @Override
928  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
929    return this.<List<NamespaceDescriptor>> newMasterCaller()
930      .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest,
931        ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub,
932          ListNamespaceDescriptorsRequest.newBuilder().build(),
933          (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done),
934          (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp)))
935      .call();
936  }
937
938  @Override
939  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
940    return this.<List<RegionInfo>> newAdminCaller()
941      .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse,
942        List<RegionInfo>> adminCall(controller, stub,
943          RequestConverter.buildGetOnlineRegionRequest(),
944          (s, c, req, done) -> s.getOnlineRegion(c, req, done),
945          resp -> ProtobufUtil.getRegionInfos(resp)))
946      .serverName(serverName).call();
947  }
948
949  @Override
950  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
951    if (tableName.equals(META_TABLE_NAME)) {
952      return connection.registry.getMetaRegionLocations()
953        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
954          .collect(Collectors.toList()));
955    } else {
956      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply(
957        locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
958    }
959  }
960
961  @Override
962  public CompletableFuture<Void> flush(TableName tableName) {
963    return flush(tableName, Collections.emptyList());
964  }
965
966  @Override
967  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
968    return flush(tableName, Collections.singletonList(columnFamily));
969  }
970
971  @Override
972  public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) {
973    // This is for keeping compatibility with old implementation.
974    // If the server version is lower than the client version, it's possible that the
975    // flushTable method is not present in the server side, if so, we need to fall back
976    // to the old implementation.
977    List<byte[]> columnFamilies = columnFamilyList.stream()
978      .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList());
979    FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies,
980      ng.getNonceGroup(), ng.newNonce());
981    CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall(
982      tableName, request, (s, c, req, done) -> s.flushTable(c, req, done),
983      (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName));
984    CompletableFuture<Void> future = new CompletableFuture<>();
985    addListener(procFuture, (ret, error) -> {
986      if (error != null) {
987        if (error instanceof TableNotFoundException || error instanceof TableNotEnabledException) {
988          future.completeExceptionally(error);
989        } else if (error instanceof DoNotRetryIOException) {
990          // usually this is caused by the method is not present on the server or
991          // the hbase hadoop version does not match the running hadoop version.
992          // if that happens, we need fall back to the old flush implementation.
993          LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error);
994          legacyFlush(future, tableName, columnFamilies);
995        } else {
996          future.completeExceptionally(error);
997        }
998      } else {
999        future.complete(ret);
1000      }
1001    });
1002    return future;
1003  }
1004
1005  private void legacyFlush(CompletableFuture<Void> future, TableName tableName,
1006    List<byte[]> columnFamilies) {
1007    addListener(tableExists(tableName), (exists, err) -> {
1008      if (err != null) {
1009        future.completeExceptionally(err);
1010      } else if (!exists) {
1011        future.completeExceptionally(new TableNotFoundException(tableName));
1012      } else {
1013        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
1014          if (err2 != null) {
1015            future.completeExceptionally(err2);
1016          } else if (!tableEnabled) {
1017            future.completeExceptionally(new TableNotEnabledException(tableName));
1018          } else {
1019            Map<String, String> props = new HashMap<>();
1020            if (columnFamilies != null && !columnFamilies.isEmpty()) {
1021              props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER
1022                .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList())));
1023            }
1024            addListener(
1025              execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props),
1026              (ret, err3) -> {
1027                if (err3 != null) {
1028                  future.completeExceptionally(err3);
1029                } else {
1030                  future.complete(ret);
1031                }
1032              });
1033          }
1034        });
1035      }
1036    });
1037  }
1038
1039  @Override
1040  public CompletableFuture<Void> flushRegion(byte[] regionName) {
1041    return flushRegion(regionName, null);
1042  }
1043
1044  @Override
1045  public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
1046    CompletableFuture<Void> future = new CompletableFuture<>();
1047    addListener(getRegionLocation(regionName), (location, err) -> {
1048      if (err != null) {
1049        future.completeExceptionally(err);
1050        return;
1051      }
1052      ServerName serverName = location.getServerName();
1053      if (serverName == null) {
1054        future
1055          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1056        return;
1057      }
1058      addListener(flush(serverName, location.getRegion(), columnFamily), (ret, err2) -> {
1059        if (err2 != null) {
1060          future.completeExceptionally(err2);
1061        } else {
1062          future.complete(ret);
1063        }
1064      });
1065    });
1066    return future;
1067  }
1068
1069  private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo,
1070    byte[] columnFamily) {
1071    return this.<Void> newAdminCaller().serverName(serverName)
1072      .action(
1073        (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse,
1074          Void> adminCall(controller, stub, RequestConverter
1075            .buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily, false),
1076            (s, c, req, done) -> s.flushRegion(c, req, done), resp -> null))
1077      .call();
1078  }
1079
1080  @Override
1081  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
1082    CompletableFuture<Void> future = new CompletableFuture<>();
1083    addListener(getRegions(sn), (hRegionInfos, err) -> {
1084      if (err != null) {
1085        future.completeExceptionally(err);
1086        return;
1087      }
1088      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1089      if (hRegionInfos != null) {
1090        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, null)));
1091      }
1092      addListener(CompletableFuture.allOf(
1093        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1094          if (err2 != null) {
1095            future.completeExceptionally(err2);
1096          } else {
1097            future.complete(ret);
1098          }
1099        });
1100    });
1101    return future;
1102  }
1103
1104  @Override
1105  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
1106    return compact(tableName, null, false, compactType);
1107  }
1108
1109  @Override
1110  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1111    CompactType compactType) {
1112    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1113      + "If you don't specify a columnFamily, use compact(TableName) instead");
1114    return compact(tableName, columnFamily, false, compactType);
1115  }
1116
1117  @Override
1118  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1119    return compactRegion(regionName, null, false);
1120  }
1121
1122  @Override
1123  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1124    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1125      + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1126    return compactRegion(regionName, columnFamily, false);
1127  }
1128
1129  @Override
1130  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1131    return compact(tableName, null, true, compactType);
1132  }
1133
1134  @Override
1135  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1136    CompactType compactType) {
1137    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1138      + "If you don't specify a columnFamily, use compact(TableName) instead");
1139    return compact(tableName, columnFamily, true, compactType);
1140  }
1141
1142  @Override
1143  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1144    return compactRegion(regionName, null, true);
1145  }
1146
1147  @Override
1148  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1149    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1150      + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1151    return compactRegion(regionName, columnFamily, true);
1152  }
1153
1154  @Override
1155  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1156    return compactRegionServer(sn, false);
1157  }
1158
1159  @Override
1160  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1161    return compactRegionServer(sn, true);
1162  }
1163
1164  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1165    CompletableFuture<Void> future = new CompletableFuture<>();
1166    addListener(getRegions(sn), (hRegionInfos, err) -> {
1167      if (err != null) {
1168        future.completeExceptionally(err);
1169        return;
1170      }
1171      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1172      if (hRegionInfos != null) {
1173        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1174      }
1175      addListener(CompletableFuture.allOf(
1176        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1177          if (err2 != null) {
1178            future.completeExceptionally(err2);
1179          } else {
1180            future.complete(ret);
1181          }
1182        });
1183    });
1184    return future;
1185  }
1186
1187  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1188    boolean major) {
1189    CompletableFuture<Void> future = new CompletableFuture<>();
1190    addListener(getRegionLocation(regionName), (location, err) -> {
1191      if (err != null) {
1192        future.completeExceptionally(err);
1193        return;
1194      }
1195      ServerName serverName = location.getServerName();
1196      if (serverName == null) {
1197        future
1198          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1199        return;
1200      }
1201      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1202        (ret, err2) -> {
1203          if (err2 != null) {
1204            future.completeExceptionally(err2);
1205          } else {
1206            future.complete(ret);
1207          }
1208        });
1209    });
1210    return future;
1211  }
1212
1213  /**
1214   * List all region locations for the specific table.
1215   */
1216  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1217    if (TableName.META_TABLE_NAME.equals(tableName)) {
1218      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1219      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1220        if (err != null) {
1221          future.completeExceptionally(err);
1222        } else if (
1223          metaRegions == null || metaRegions.isEmpty()
1224            || metaRegions.getDefaultRegionLocation() == null
1225        ) {
1226          future.completeExceptionally(new IOException("meta region does not found"));
1227        } else {
1228          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1229        }
1230      });
1231      return future;
1232    } else {
1233      // For non-meta table, we fetch all locations by scanning hbase:meta table
1234      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1235    }
1236  }
1237
1238  /**
1239   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1240   */
1241  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1242    CompactType compactType) {
1243    CompletableFuture<Void> future = new CompletableFuture<>();
1244
1245    switch (compactType) {
1246      case MOB:
1247        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1248          if (err != null) {
1249            future.completeExceptionally(err);
1250            return;
1251          }
1252          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1253          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1254            if (err2 != null) {
1255              future.completeExceptionally(err2);
1256            } else {
1257              future.complete(ret);
1258            }
1259          });
1260        });
1261        break;
1262      case NORMAL:
1263        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1264          if (err != null) {
1265            future.completeExceptionally(err);
1266            return;
1267          }
1268          if (locations == null || locations.isEmpty()) {
1269            future.completeExceptionally(new TableNotFoundException(tableName));
1270          }
1271          CompletableFuture<?>[] compactFutures =
1272            locations.stream().filter(l -> l.getRegion() != null)
1273              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1274              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1275              .toArray(CompletableFuture<?>[]::new);
1276          // future complete unless all of the compact futures are completed.
1277          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1278            if (err2 != null) {
1279              future.completeExceptionally(err2);
1280            } else {
1281              future.complete(ret);
1282            }
1283          });
1284        });
1285        break;
1286      default:
1287        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1288    }
1289    return future;
1290  }
1291
1292  /**
1293   * Compact the region at specific region server.
1294   */
1295  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1296    final boolean major, byte[] columnFamily) {
1297    return this.<Void> newAdminCaller().serverName(sn)
1298      .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse,
1299        Void> adminCall(controller, stub,
1300          RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily),
1301          (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null))
1302      .call();
1303  }
1304
1305  private byte[] toEncodeRegionName(byte[] regionName) {
1306    return RegionInfo.isEncodedRegionName(regionName)
1307      ? regionName
1308      : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1309  }
1310
1311  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1312    CompletableFuture<TableName> result) {
1313    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1314      if (err != null) {
1315        result.completeExceptionally(err);
1316        return;
1317      }
1318      RegionInfo regionInfo = location.getRegion();
1319      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1320        result.completeExceptionally(
1321          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1322        return;
1323      }
1324      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1325        if (!tableName.get().equals(regionInfo.getTable())) {
1326          // tables of this two region should be same.
1327          result.completeExceptionally(
1328            new IllegalArgumentException("Cannot merge regions from two different tables "
1329              + tableName.get() + " and " + regionInfo.getTable()));
1330        } else {
1331          result.complete(tableName.get());
1332        }
1333      }
1334    });
1335  }
1336
1337  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1338    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1339    CompletableFuture<TableName> future = new CompletableFuture<>();
1340    for (byte[] encodedRegionName : encodedRegionNames) {
1341      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1342    }
1343    return future;
1344  }
1345
1346  @Override
1347  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1348    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1349  }
1350
1351  @Override
1352  public CompletableFuture<Boolean> isMergeEnabled() {
1353    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1354  }
1355
1356  @Override
1357  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1358    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1359  }
1360
1361  @Override
1362  public CompletableFuture<Boolean> isSplitEnabled() {
1363    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1364  }
1365
1366  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1367    MasterSwitchType switchType) {
1368    SetSplitOrMergeEnabledRequest request =
1369      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1370    return this.<Boolean> newMasterCaller()
1371      .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest,
1372        SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1373          (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1374          (resp) -> resp.getPrevValueList().get(0)))
1375      .call();
1376  }
1377
1378  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1379    IsSplitOrMergeEnabledRequest request =
1380      RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1381    return this.<Boolean> newMasterCaller()
1382      .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest,
1383        IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1384          (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled()))
1385      .call();
1386  }
1387
1388  @Override
1389  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1390    if (nameOfRegionsToMerge.size() < 2) {
1391      return failedFuture(new IllegalArgumentException(
1392        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1393    }
1394    CompletableFuture<Void> future = new CompletableFuture<>();
1395    byte[][] encodedNameOfRegionsToMerge =
1396      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1397
1398    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1399      if (err != null) {
1400        future.completeExceptionally(err);
1401        return;
1402      }
1403
1404      final MergeTableRegionsRequest request;
1405      try {
1406        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1407          forcible, ng.getNonceGroup(), ng.newNonce());
1408      } catch (DeserializationException e) {
1409        future.completeExceptionally(e);
1410        return;
1411      }
1412
1413      addListener(
1414        this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions,
1415          MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)),
1416        (ret, err2) -> {
1417          if (err2 != null) {
1418            future.completeExceptionally(err2);
1419          } else {
1420            future.complete(ret);
1421          }
1422        });
1423    });
1424    return future;
1425  }
1426
1427  @Override
1428  public CompletableFuture<Void> split(TableName tableName) {
1429    CompletableFuture<Void> future = new CompletableFuture<>();
1430    addListener(tableExists(tableName), (exist, error) -> {
1431      if (error != null) {
1432        future.completeExceptionally(error);
1433        return;
1434      }
1435      if (!exist) {
1436        future.completeExceptionally(new TableNotFoundException(tableName));
1437        return;
1438      }
1439      addListener(
1440        metaTable
1441          .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1442            .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
1443            .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))),
1444        (results, err2) -> {
1445          if (err2 != null) {
1446            future.completeExceptionally(err2);
1447            return;
1448          }
1449          if (results != null && !results.isEmpty()) {
1450            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1451            for (Result r : results) {
1452              if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) {
1453                continue;
1454              }
1455              RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
1456              if (rl != null) {
1457                for (HRegionLocation h : rl.getRegionLocations()) {
1458                  if (h != null && h.getServerName() != null) {
1459                    RegionInfo hri = h.getRegion();
1460                    if (
1461                      hri == null || hri.isSplitParent()
1462                        || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID
1463                    ) {
1464                      continue;
1465                    }
1466                    splitFutures.add(split(hri, null));
1467                  }
1468                }
1469              }
1470            }
1471            addListener(
1472              CompletableFuture
1473                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1474              (ret, exception) -> {
1475                if (exception != null) {
1476                  future.completeExceptionally(exception);
1477                  return;
1478                }
1479                future.complete(ret);
1480              });
1481          } else {
1482            future.complete(null);
1483          }
1484        });
1485    });
1486    return future;
1487  }
1488
1489  @Override
1490  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1491    CompletableFuture<Void> result = new CompletableFuture<>();
1492    if (splitPoint == null) {
1493      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1494    }
1495    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1496      (loc, err) -> {
1497        if (err != null) {
1498          result.completeExceptionally(err);
1499        } else if (loc == null || loc.getRegion() == null) {
1500          result.completeExceptionally(new IllegalArgumentException(
1501            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1502        } else {
1503          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1504            if (err2 != null) {
1505              result.completeExceptionally(err2);
1506            } else {
1507              result.complete(ret);
1508            }
1509
1510          });
1511        }
1512      });
1513    return result;
1514  }
1515
1516  @Override
1517  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1518    CompletableFuture<Void> future = new CompletableFuture<>();
1519    addListener(getRegionLocation(regionName), (location, err) -> {
1520      if (err != null) {
1521        future.completeExceptionally(err);
1522        return;
1523      }
1524      RegionInfo regionInfo = location.getRegion();
1525      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1526        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1527          + "Replicas are auto-split when their primary is split."));
1528        return;
1529      }
1530      ServerName serverName = location.getServerName();
1531      if (serverName == null) {
1532        future
1533          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1534        return;
1535      }
1536      addListener(split(regionInfo, null), (ret, err2) -> {
1537        if (err2 != null) {
1538          future.completeExceptionally(err2);
1539        } else {
1540          future.complete(ret);
1541        }
1542      });
1543    });
1544    return future;
1545  }
1546
1547  @Override
1548  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1549    Preconditions.checkNotNull(splitPoint,
1550      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1551    CompletableFuture<Void> future = new CompletableFuture<>();
1552    addListener(getRegionLocation(regionName), (location, err) -> {
1553      if (err != null) {
1554        future.completeExceptionally(err);
1555        return;
1556      }
1557      RegionInfo regionInfo = location.getRegion();
1558      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1559        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1560          + "Replicas are auto-split when their primary is split."));
1561        return;
1562      }
1563      ServerName serverName = location.getServerName();
1564      if (serverName == null) {
1565        future
1566          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1567        return;
1568      }
1569      if (
1570        regionInfo.getStartKey() != null
1571          && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0
1572      ) {
1573        future.completeExceptionally(
1574          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1575        return;
1576      }
1577      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1578        if (err2 != null) {
1579          future.completeExceptionally(err2);
1580        } else {
1581          future.complete(ret);
1582        }
1583      });
1584    });
1585    return future;
1586  }
1587
1588  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1589    CompletableFuture<Void> future = new CompletableFuture<>();
1590    TableName tableName = hri.getTable();
1591    final SplitTableRegionRequest request;
1592    try {
1593      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1594        ng.newNonce());
1595    } catch (DeserializationException e) {
1596      future.completeExceptionally(e);
1597      return future;
1598    }
1599
1600    addListener(
1601      this.procedureCall(tableName, request, MasterService.Interface::splitRegion,
1602        SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)),
1603      (ret, err2) -> {
1604        if (err2 != null) {
1605          future.completeExceptionally(err2);
1606        } else {
1607          future.complete(ret);
1608        }
1609      });
1610    return future;
1611  }
1612
1613  @Override
1614  public CompletableFuture<Void> truncateRegion(byte[] regionName) {
1615    CompletableFuture<Void> future = new CompletableFuture<>();
1616    addListener(getRegionLocation(regionName), (location, err) -> {
1617      if (err != null) {
1618        future.completeExceptionally(err);
1619        return;
1620      }
1621      RegionInfo regionInfo = location.getRegion();
1622      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1623        future.completeExceptionally(new IllegalArgumentException(
1624          "Can't truncate replicas directly.Replicas are auto-truncated "
1625            + "when their primary is truncated."));
1626        return;
1627      }
1628      ServerName serverName = location.getServerName();
1629      if (serverName == null) {
1630        future
1631          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1632        return;
1633      }
1634      addListener(truncateRegion(regionInfo), (ret, err2) -> {
1635        if (err2 != null) {
1636          future.completeExceptionally(err2);
1637        } else {
1638          future.complete(ret);
1639        }
1640      });
1641    });
1642    return future;
1643  }
1644
1645  private CompletableFuture<Void> truncateRegion(final RegionInfo hri) {
1646    CompletableFuture<Void> future = new CompletableFuture<>();
1647    TableName tableName = hri.getTable();
1648    final MasterProtos.TruncateRegionRequest request;
1649    try {
1650      request = RequestConverter.buildTruncateRegionRequest(hri, ng.getNonceGroup(), ng.newNonce());
1651    } catch (DeserializationException e) {
1652      future.completeExceptionally(e);
1653      return future;
1654    }
1655    addListener(this.procedureCall(tableName, request, MasterService.Interface::truncateRegion,
1656      MasterProtos.TruncateRegionResponse::getProcId,
1657      new TruncateRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
1658        if (err2 != null) {
1659          future.completeExceptionally(err2);
1660        } else {
1661          future.complete(ret);
1662        }
1663      });
1664    return future;
1665  }
1666
1667  @Override
1668  public CompletableFuture<Void> assign(byte[] regionName) {
1669    CompletableFuture<Void> future = new CompletableFuture<>();
1670    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1671      if (err != null) {
1672        future.completeExceptionally(err);
1673        return;
1674      }
1675      addListener(
1676        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1677          .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1678            controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1679            (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))
1680          .call(),
1681        (ret, err2) -> {
1682          if (err2 != null) {
1683            future.completeExceptionally(err2);
1684          } else {
1685            future.complete(ret);
1686          }
1687        });
1688    });
1689    return future;
1690  }
1691
1692  @Override
1693  public CompletableFuture<Void> unassign(byte[] regionName) {
1694    CompletableFuture<Void> future = new CompletableFuture<>();
1695    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1696      if (err != null) {
1697        future.completeExceptionally(err);
1698        return;
1699      }
1700      addListener(
1701        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1702          .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse,
1703            Void> call(controller, stub,
1704              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()),
1705              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))
1706          .call(),
1707        (ret, err2) -> {
1708          if (err2 != null) {
1709            future.completeExceptionally(err2);
1710          } else {
1711            future.complete(ret);
1712          }
1713        });
1714    });
1715    return future;
1716  }
1717
1718  @Override
1719  public CompletableFuture<Void> offline(byte[] regionName) {
1720    CompletableFuture<Void> future = new CompletableFuture<>();
1721    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1722      if (err != null) {
1723        future.completeExceptionally(err);
1724        return;
1725      }
1726      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1727        .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
1728          controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1729          (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))
1730        .call(), (ret, err2) -> {
1731          if (err2 != null) {
1732            future.completeExceptionally(err2);
1733          } else {
1734            future.complete(ret);
1735          }
1736        });
1737    });
1738    return future;
1739  }
1740
1741  @Override
1742  public CompletableFuture<Void> move(byte[] regionName) {
1743    CompletableFuture<Void> future = new CompletableFuture<>();
1744    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1745      if (err != null) {
1746        future.completeExceptionally(err);
1747        return;
1748      }
1749      addListener(
1750        moveRegion(regionInfo,
1751          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1752        (ret, err2) -> {
1753          if (err2 != null) {
1754            future.completeExceptionally(err2);
1755          } else {
1756            future.complete(ret);
1757          }
1758        });
1759    });
1760    return future;
1761  }
1762
1763  @Override
1764  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1765    Preconditions.checkNotNull(destServerName,
1766      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1767    CompletableFuture<Void> future = new CompletableFuture<>();
1768    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1769      if (err != null) {
1770        future.completeExceptionally(err);
1771        return;
1772      }
1773      addListener(
1774        moveRegion(regionInfo, RequestConverter
1775          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1776        (ret, err2) -> {
1777          if (err2 != null) {
1778            future.completeExceptionally(err2);
1779          } else {
1780            future.complete(ret);
1781          }
1782        });
1783    });
1784    return future;
1785  }
1786
1787  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1788    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1789      .action(
1790        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1791          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1792      .call();
1793  }
1794
1795  @Override
1796  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1797    return this.<Void> newMasterCaller()
1798      .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1799        stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1800        (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null))
1801      .call();
1802  }
1803
1804  @Override
1805  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1806    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1807    Scan scan = QuotaTableUtil.makeScan(filter);
1808    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan,
1809      new AdvancedScanResultConsumer() {
1810        List<QuotaSettings> settings = new ArrayList<>();
1811
1812        @Override
1813        public void onNext(Result[] results, ScanController controller) {
1814          for (Result result : results) {
1815            try {
1816              QuotaTableUtil.parseResultToCollection(result, settings);
1817            } catch (IOException e) {
1818              controller.terminate();
1819              future.completeExceptionally(e);
1820            }
1821          }
1822        }
1823
1824        @Override
1825        public void onError(Throwable error) {
1826          future.completeExceptionally(error);
1827        }
1828
1829        @Override
1830        public void onComplete() {
1831          future.complete(settings);
1832        }
1833      });
1834    return future;
1835  }
1836
1837  @Override
1838  public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig,
1839    boolean enabled) {
1840    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1841      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1842      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1843      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1844  }
1845
1846  @Override
1847  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1848    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1849      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1850      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1851      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1852  }
1853
1854  @Override
1855  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1856    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1857      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1858      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1859      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1860  }
1861
1862  @Override
1863  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1864    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1865      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1866      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1867      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1868  }
1869
1870  @Override
1871  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1872    return this.<ReplicationPeerConfig> newMasterCaller()
1873      .action((controller, stub) -> this.<GetReplicationPeerConfigRequest,
1874        GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub,
1875          RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1876          (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1877          (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
1878      .call();
1879  }
1880
1881  @Override
1882  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1883    ReplicationPeerConfig peerConfig) {
1884    return this.<UpdateReplicationPeerConfigRequest,
1885      UpdateReplicationPeerConfigResponse> procedureCall(
1886        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1887        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1888        (resp) -> resp.getProcId(),
1889        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1890  }
1891
1892  @Override
1893  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1894    Map<TableName, List<String>> tableCfs) {
1895    if (tableCfs == null) {
1896      return failedFuture(new ReplicationException("tableCfs is null"));
1897    }
1898
1899    CompletableFuture<Void> future = new CompletableFuture<Void>();
1900    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1901      if (!completeExceptionally(future, error)) {
1902        ReplicationPeerConfig newPeerConfig =
1903          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1904        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1905          if (!completeExceptionally(future, error)) {
1906            future.complete(result);
1907          }
1908        });
1909      }
1910    });
1911    return future;
1912  }
1913
1914  @Override
1915  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1916    Map<TableName, List<String>> tableCfs) {
1917    if (tableCfs == null) {
1918      return failedFuture(new ReplicationException("tableCfs is null"));
1919    }
1920
1921    CompletableFuture<Void> future = new CompletableFuture<Void>();
1922    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1923      if (!completeExceptionally(future, error)) {
1924        ReplicationPeerConfig newPeerConfig = null;
1925        try {
1926          newPeerConfig = ReplicationPeerConfigUtil
1927            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1928        } catch (ReplicationException e) {
1929          future.completeExceptionally(e);
1930          return;
1931        }
1932        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1933          if (!completeExceptionally(future, error)) {
1934            future.complete(result);
1935          }
1936        });
1937      }
1938    });
1939    return future;
1940  }
1941
1942  @Override
1943  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1944    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1945  }
1946
1947  @Override
1948  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1949    Preconditions.checkNotNull(pattern,
1950      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1951    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1952  }
1953
1954  private CompletableFuture<List<ReplicationPeerDescription>>
1955    listReplicationPeers(ListReplicationPeersRequest request) {
1956    return this.<List<ReplicationPeerDescription>> newMasterCaller()
1957      .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
1958        List<ReplicationPeerDescription>> call(controller, stub, request,
1959          (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1960          (resp) -> resp.getPeerDescList().stream()
1961            .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1962            .collect(Collectors.toList())))
1963      .call();
1964  }
1965
1966  @Override
1967  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1968    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1969    addListener(listTableDescriptors(), (tables, error) -> {
1970      if (!completeExceptionally(future, error)) {
1971        List<TableCFs> replicatedTableCFs = new ArrayList<>();
1972        tables.forEach(table -> {
1973          Map<String, Integer> cfs = new HashMap<>();
1974          Stream.of(table.getColumnFamilies())
1975            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1976            .forEach(column -> {
1977              cfs.put(column.getNameAsString(), column.getScope());
1978            });
1979          if (!cfs.isEmpty()) {
1980            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1981          }
1982        });
1983        future.complete(replicatedTableCFs);
1984      }
1985    });
1986    return future;
1987  }
1988
1989  @Override
1990  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1991    SnapshotProtos.SnapshotDescription snapshot =
1992      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1993    try {
1994      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1995    } catch (IllegalArgumentException e) {
1996      return failedFuture(e);
1997    }
1998    CompletableFuture<Void> future = new CompletableFuture<>();
1999    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
2000      .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build();
2001    addListener(this.<SnapshotResponse> newMasterCaller()
2002      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call(
2003        controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp))
2004      .call(), (resp, err) -> {
2005        if (err != null) {
2006          future.completeExceptionally(err);
2007          return;
2008        }
2009        waitSnapshotFinish(snapshotDesc, future, resp);
2010      });
2011    return future;
2012  }
2013
2014  // This is for keeping compatibility with old implementation.
2015  // If there is a procId field in the response, then the snapshot will be operated with a
2016  // SnapshotProcedure, otherwise the snapshot will be coordinated by zk.
2017  private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future,
2018    SnapshotResponse resp) {
2019    if (resp.hasProcId()) {
2020      getProcedureResult(resp.getProcId(), future, 0);
2021      addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName()));
2022    } else {
2023      long expectedTimeout = resp.getExpectedTimeout();
2024      TimerTask pollingTask = new TimerTask() {
2025        int tries = 0;
2026        long startTime = EnvironmentEdgeManager.currentTime();
2027        long endTime = startTime + expectedTimeout;
2028        long maxPauseTime = expectedTimeout / maxAttempts;
2029
2030        @Override
2031        public void run(Timeout timeout) throws Exception {
2032          if (EnvironmentEdgeManager.currentTime() < endTime) {
2033            addListener(isSnapshotFinished(snapshot), (done, err2) -> {
2034              if (err2 != null) {
2035                future.completeExceptionally(err2);
2036              } else if (done) {
2037                future.complete(null);
2038              } else {
2039                // retry again after pauseTime.
2040                long pauseTime =
2041                  ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2042                pauseTime = Math.min(pauseTime, maxPauseTime);
2043                AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
2044              }
2045            });
2046          } else {
2047            future
2048              .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName()
2049                + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot));
2050          }
2051        }
2052      };
2053      AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2054    }
2055  }
2056
2057  @Override
2058  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
2059    return this.<Boolean> newMasterCaller()
2060      .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse,
2061        Boolean> call(controller, stub,
2062          IsSnapshotDoneRequest.newBuilder()
2063            .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2064          (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone()))
2065      .call();
2066  }
2067
2068  @Override
2069  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
2070    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
2071      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
2072      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
2073    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
2074  }
2075
2076  @Override
2077  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
2078    boolean restoreAcl) {
2079    CompletableFuture<Void> future = new CompletableFuture<>();
2080    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
2081      if (err != null) {
2082        future.completeExceptionally(err);
2083        return;
2084      }
2085      TableName tableName = null;
2086      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
2087        for (SnapshotDescription snap : snapshotDescriptions) {
2088          if (snap.getName().equals(snapshotName)) {
2089            tableName = snap.getTableName();
2090            break;
2091          }
2092        }
2093      }
2094      if (tableName == null) {
2095        future.completeExceptionally(new RestoreSnapshotException(
2096          "Unable to find the table name for snapshot=" + snapshotName));
2097        return;
2098      }
2099      final TableName finalTableName = tableName;
2100      addListener(tableExists(finalTableName), (exists, err2) -> {
2101        if (err2 != null) {
2102          future.completeExceptionally(err2);
2103        } else if (!exists) {
2104          // if table does not exist, then just clone snapshot into new table.
2105          completeConditionalOnFuture(future,
2106            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null));
2107        } else {
2108          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
2109            if (err4 != null) {
2110              future.completeExceptionally(err4);
2111            } else if (!disabled) {
2112              future.completeExceptionally(new TableNotDisabledException(finalTableName));
2113            } else {
2114              completeConditionalOnFuture(future,
2115                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
2116            }
2117          });
2118        }
2119      });
2120    });
2121    return future;
2122  }
2123
2124  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
2125    boolean takeFailSafeSnapshot, boolean restoreAcl) {
2126    if (takeFailSafeSnapshot) {
2127      CompletableFuture<Void> future = new CompletableFuture<>();
2128      // Step.1 Take a snapshot of the current state
2129      String failSafeSnapshotSnapshotNameFormat =
2130        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
2131          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
2132      final String failSafeSnapshotSnapshotName =
2133        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
2134          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2135          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2136      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2137      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
2138        if (err != null) {
2139          future.completeExceptionally(err);
2140        } else {
2141          // Step.2 Restore snapshot
2142          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null),
2143            (void2, err2) -> {
2144              if (err2 != null) {
2145                // Step.3.a Something went wrong during the restore and try to rollback.
2146                addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName,
2147                  restoreAcl, null), (void3, err3) -> {
2148                    if (err3 != null) {
2149                      future.completeExceptionally(err3);
2150                    } else {
2151                      String msg =
2152                        "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot="
2153                          + failSafeSnapshotSnapshotName + " succeeded.";
2154                      future.completeExceptionally(new RestoreSnapshotException(msg, err2));
2155                    }
2156                  });
2157              } else {
2158                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2159                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2160                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2161                  if (err3 != null) {
2162                    LOG.error(
2163                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2164                      err3);
2165                    future.completeExceptionally(err3);
2166                  } else {
2167                    future.complete(ret3);
2168                  }
2169                });
2170              }
2171            });
2172        }
2173      });
2174      return future;
2175    } else {
2176      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null);
2177    }
2178  }
2179
2180  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2181    CompletableFuture<T> parentFuture) {
2182    addListener(parentFuture, (res, err) -> {
2183      if (err != null) {
2184        dependentFuture.completeExceptionally(err);
2185      } else {
2186        dependentFuture.complete(res);
2187      }
2188    });
2189  }
2190
2191  @Override
2192  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2193    boolean restoreAcl, String customSFT) {
2194    CompletableFuture<Void> future = new CompletableFuture<>();
2195    addListener(tableExists(tableName), (exists, err) -> {
2196      if (err != null) {
2197        future.completeExceptionally(err);
2198      } else if (exists) {
2199        future.completeExceptionally(new TableExistsException(tableName));
2200      } else {
2201        completeConditionalOnFuture(future,
2202          internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT));
2203      }
2204    });
2205    return future;
2206  }
2207
2208  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2209    boolean restoreAcl, String customSFT) {
2210    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2211      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2212    try {
2213      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2214    } catch (IllegalArgumentException e) {
2215      return failedFuture(e);
2216    }
2217    RestoreSnapshotRequest.Builder builder =
2218      RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2219        .setNonce(ng.newNonce()).setRestoreACL(restoreAcl);
2220    if (customSFT != null) {
2221      builder.setCustomSFT(customSFT);
2222    }
2223    return waitProcedureResult(this.<Long> newMasterCaller()
2224      .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse,
2225        Long> call(controller, stub, builder.build(),
2226          (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2227      .call());
2228  }
2229
2230  @Override
2231  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2232    return getCompletedSnapshots(null);
2233  }
2234
2235  @Override
2236  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2237    Preconditions.checkNotNull(pattern,
2238      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2239    return getCompletedSnapshots(pattern);
2240  }
2241
2242  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2243    return this.<List<SnapshotDescription>> newMasterCaller()
2244      .action((controller, stub) -> this.<GetCompletedSnapshotsRequest,
2245        GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub,
2246          GetCompletedSnapshotsRequest.newBuilder().build(),
2247          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2248          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2249      .call();
2250  }
2251
2252  @Override
2253  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2254    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2255      + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2256    return getCompletedSnapshots(tableNamePattern, null);
2257  }
2258
2259  @Override
2260  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2261    Pattern snapshotNamePattern) {
2262    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2263      + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2264    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2265      + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2266    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2267  }
2268
2269  private CompletableFuture<List<SnapshotDescription>>
2270    getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) {
2271    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2272    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2273      if (err != null) {
2274        future.completeExceptionally(err);
2275        return;
2276      }
2277      if (tableNames == null || tableNames.size() <= 0) {
2278        future.complete(Collections.emptyList());
2279        return;
2280      }
2281      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2282        if (err2 != null) {
2283          future.completeExceptionally(err2);
2284          return;
2285        }
2286        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2287          future.complete(Collections.emptyList());
2288          return;
2289        }
2290        future.complete(snapshotDescList.stream()
2291          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2292          .collect(Collectors.toList()));
2293      });
2294    });
2295    return future;
2296  }
2297
2298  @Override
2299  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2300    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2301  }
2302
2303  @Override
2304  public CompletableFuture<Void> deleteSnapshots() {
2305    return internalDeleteSnapshots(null, null);
2306  }
2307
2308  @Override
2309  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2310    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2311      + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2312    return internalDeleteSnapshots(null, snapshotNamePattern);
2313  }
2314
2315  @Override
2316  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2317    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2318      + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2319    return internalDeleteSnapshots(tableNamePattern, null);
2320  }
2321
2322  @Override
2323  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2324    Pattern snapshotNamePattern) {
2325    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2326      + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2327    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2328      + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2329    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2330  }
2331
2332  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2333    Pattern snapshotNamePattern) {
2334    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2335    if (tableNamePattern == null) {
2336      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2337    } else {
2338      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2339    }
2340    CompletableFuture<Void> future = new CompletableFuture<>();
2341    addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> {
2342      if (err != null) {
2343        future.completeExceptionally(err);
2344        return;
2345      }
2346      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2347        future.complete(null);
2348        return;
2349      }
2350      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2351        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2352          if (e != null) {
2353            future.completeExceptionally(e);
2354          } else {
2355            future.complete(v);
2356          }
2357        });
2358    });
2359    return future;
2360  }
2361
2362  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2363    return this.<Void> newMasterCaller()
2364      .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2365        controller, stub,
2366        DeleteSnapshotRequest.newBuilder()
2367          .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2368        (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null))
2369      .call();
2370  }
2371
2372  @Override
2373  public CompletableFuture<Void> execProcedure(String signature, String instance,
2374    Map<String, String> props) {
2375    CompletableFuture<Void> future = new CompletableFuture<>();
2376    ProcedureDescription procDesc =
2377      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2378    addListener(this.<Long> newMasterCaller()
2379      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2380        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2381        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2382      .call(), (expectedTimeout, err) -> {
2383        if (err != null) {
2384          future.completeExceptionally(err);
2385          return;
2386        }
2387        TimerTask pollingTask = new TimerTask() {
2388          int tries = 0;
2389          long startTime = EnvironmentEdgeManager.currentTime();
2390          long endTime = startTime + expectedTimeout;
2391          long maxPauseTime = expectedTimeout / maxAttempts;
2392
2393          @Override
2394          public void run(Timeout timeout) throws Exception {
2395            if (EnvironmentEdgeManager.currentTime() < endTime) {
2396              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2397                if (err2 != null) {
2398                  future.completeExceptionally(err2);
2399                  return;
2400                }
2401                if (done) {
2402                  future.complete(null);
2403                } else {
2404                  // retry again after pauseTime.
2405                  long pauseTime =
2406                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2407                  pauseTime = Math.min(pauseTime, maxPauseTime);
2408                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2409                    TimeUnit.MICROSECONDS);
2410                }
2411              });
2412            } else {
2413              future.completeExceptionally(new IOException("Procedure '" + signature + " : "
2414                + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2415            }
2416          }
2417        };
2418        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2419        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2420      });
2421    return future;
2422  }
2423
2424  @Override
2425  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2426    Map<String, String> props) {
2427    ProcedureDescription proDesc =
2428      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2429    return this.<byte[]> newMasterCaller()
2430      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2431        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2432        (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2433        resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2434      .call();
2435  }
2436
2437  @Override
2438  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2439    Map<String, String> props) {
2440    ProcedureDescription proDesc =
2441      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2442    return this.<Boolean> newMasterCaller()
2443      .action(
2444        (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(
2445          controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2446          (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2447      .call();
2448  }
2449
2450  @Override
2451  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2452    return this.<Boolean> newMasterCaller().action(
2453      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2454        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2455        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2456      .call();
2457  }
2458
2459  @Override
2460  public CompletableFuture<String> getProcedures() {
2461    return this.<String> newMasterCaller()
2462      .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call(
2463        controller, stub, GetProceduresRequest.newBuilder().build(),
2464        (s, c, req, done) -> s.getProcedures(c, req, done),
2465        resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList())))
2466      .call();
2467  }
2468
2469  @Override
2470  public CompletableFuture<String> getLocks() {
2471    return this.<String> newMasterCaller()
2472      .action(
2473        (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller,
2474          stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done),
2475          resp -> ProtobufUtil.toLockJson(resp.getLockList())))
2476      .call();
2477  }
2478
2479  @Override
2480  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers,
2481    boolean offload) {
2482    return this.<Void> newMasterCaller()
2483      .action((controller, stub) -> this.<DecommissionRegionServersRequest,
2484        DecommissionRegionServersResponse, Void> call(controller, stub,
2485          RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2486          (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2487      .call();
2488  }
2489
2490  @Override
2491  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2492    return this.<List<ServerName>> newMasterCaller()
2493      .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest,
2494        ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub,
2495          ListDecommissionedRegionServersRequest.newBuilder().build(),
2496          (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2497          resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2498            .collect(Collectors.toList())))
2499      .call();
2500  }
2501
2502  @Override
2503  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2504    List<byte[]> encodedRegionNames) {
2505    return this.<Void> newMasterCaller()
2506      .action((controller, stub) -> this.<RecommissionRegionServerRequest,
2507        RecommissionRegionServerResponse, Void> call(controller, stub,
2508          RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2509          (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2510      .call();
2511  }
2512
2513  /**
2514   * Get the region location for the passed region name. The region name may be a full region name
2515   * or encoded region name. If the region does not found, then it'll throw an
2516   * UnknownRegionException wrapped by a {@link CompletableFuture}
2517   * @param regionNameOrEncodedRegionName region name or encoded region name
2518   * @return region location, wrapped by a {@link CompletableFuture}
2519   */
2520  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2521    if (regionNameOrEncodedRegionName == null) {
2522      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2523    }
2524
2525    CompletableFuture<Optional<HRegionLocation>> future;
2526    if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2527      String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2528      if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2529        // old format encodedName, should be meta region
2530        future = connection.registry.getMetaRegionLocations()
2531          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2532            .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2533      } else {
2534        future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2535          regionNameOrEncodedRegionName);
2536      }
2537    } else {
2538      // Not all regionNameOrEncodedRegionName here is going to be a valid region name,
2539      // it needs to throw out IllegalArgumentException in case tableName is passed in.
2540      RegionInfo regionInfo;
2541      try {
2542        regionInfo = MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2543      } catch (IOException ioe) {
2544        return failedFuture(new IllegalArgumentException(ioe.getMessage()));
2545      }
2546
2547      if (regionInfo.isMetaRegion()) {
2548        future = connection.registry.getMetaRegionLocations()
2549          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2550            .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2551            .findFirst());
2552      } else {
2553        future = AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2554      }
2555    }
2556
2557    CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2558    addListener(future, (location, err) -> {
2559      if (err != null) {
2560        returnedFuture.completeExceptionally(err);
2561        return;
2562      }
2563      if (!location.isPresent() || location.get().getRegion() == null) {
2564        returnedFuture.completeExceptionally(
2565          new UnknownRegionException("Invalid region name or encoded region name: "
2566            + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2567      } else {
2568        returnedFuture.complete(location.get());
2569      }
2570    });
2571    return returnedFuture;
2572  }
2573
2574  /**
2575   * Get the region info for the passed region name. The region name may be a full region name or
2576   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2577   * wrapped by a {@link CompletableFuture}
2578   * @return region info, wrapped by a {@link CompletableFuture}
2579   */
2580  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2581    if (regionNameOrEncodedRegionName == null) {
2582      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2583    }
2584
2585    if (
2586      Bytes.equals(regionNameOrEncodedRegionName,
2587        RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName())
2588        || Bytes.equals(regionNameOrEncodedRegionName,
2589          RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())
2590    ) {
2591      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2592    }
2593
2594    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2595    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2596      if (err != null) {
2597        future.completeExceptionally(err);
2598      } else {
2599        future.complete(location.getRegion());
2600      }
2601    });
2602    return future;
2603  }
2604
2605  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2606    if (numRegions < 3) {
2607      throw new IllegalArgumentException("Must create at least three regions");
2608    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2609      throw new IllegalArgumentException("Start key must be smaller than end key");
2610    }
2611    if (numRegions == 3) {
2612      return new byte[][] { startKey, endKey };
2613    }
2614    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2615    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2616      throw new IllegalArgumentException("Unable to split key range into enough regions");
2617    }
2618    return splitKeys;
2619  }
2620
2621  private void verifySplitKeys(byte[][] splitKeys) {
2622    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2623    // Verify there are no duplicate split keys
2624    byte[] lastKey = null;
2625    for (byte[] splitKey : splitKeys) {
2626      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2627        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2628      }
2629      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2630        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2631          + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2632      }
2633      lastKey = splitKey;
2634    }
2635  }
2636
2637  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2638
2639    abstract void onFinished();
2640
2641    abstract void onError(Throwable error);
2642
2643    @Override
2644    public void accept(Void v, Throwable error) {
2645      if (error != null) {
2646        onError(error);
2647        return;
2648      }
2649      onFinished();
2650    }
2651  }
2652
2653  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2654    protected final TableName tableName;
2655
2656    TableProcedureBiConsumer(TableName tableName) {
2657      this.tableName = tableName;
2658    }
2659
2660    abstract String getOperationType();
2661
2662    String getDescription() {
2663      return "Operation: " + getOperationType() + ", " + "Table Name: "
2664        + tableName.getNameWithNamespaceInclAsString();
2665    }
2666
2667    @Override
2668    void onFinished() {
2669      LOG.info(getDescription() + " completed");
2670    }
2671
2672    @Override
2673    void onError(Throwable error) {
2674      LOG.info(getDescription() + " failed with " + error.getMessage());
2675    }
2676  }
2677
2678  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2679    protected final String namespaceName;
2680
2681    NamespaceProcedureBiConsumer(String namespaceName) {
2682      this.namespaceName = namespaceName;
2683    }
2684
2685    abstract String getOperationType();
2686
2687    String getDescription() {
2688      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2689    }
2690
2691    @Override
2692    void onFinished() {
2693      LOG.info(getDescription() + " completed");
2694    }
2695
2696    @Override
2697    void onError(Throwable error) {
2698      LOG.info(getDescription() + " failed with " + error.getMessage());
2699    }
2700  }
2701
2702  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2703
2704    CreateTableProcedureBiConsumer(TableName tableName) {
2705      super(tableName);
2706    }
2707
2708    @Override
2709    String getOperationType() {
2710      return "CREATE";
2711    }
2712  }
2713
2714  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2715
2716    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2717      super(tableName);
2718    }
2719
2720    @Override
2721    String getOperationType() {
2722      return "MODIFY";
2723    }
2724  }
2725
2726  private static class ModifyTableStoreFileTrackerProcedureBiConsumer
2727    extends TableProcedureBiConsumer {
2728
2729    ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2730      super(tableName);
2731    }
2732
2733    @Override
2734    String getOperationType() {
2735      return "MODIFY_TABLE_STORE_FILE_TRACKER";
2736    }
2737  }
2738
2739  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2740
2741    DeleteTableProcedureBiConsumer(TableName tableName) {
2742      super(tableName);
2743    }
2744
2745    @Override
2746    String getOperationType() {
2747      return "DELETE";
2748    }
2749
2750    @Override
2751    void onFinished() {
2752      connection.getLocator().clearCache(this.tableName);
2753      super.onFinished();
2754    }
2755  }
2756
2757  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2758
2759    TruncateTableProcedureBiConsumer(TableName tableName) {
2760      super(tableName);
2761    }
2762
2763    @Override
2764    String getOperationType() {
2765      return "TRUNCATE";
2766    }
2767  }
2768
2769  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2770
2771    EnableTableProcedureBiConsumer(TableName tableName) {
2772      super(tableName);
2773    }
2774
2775    @Override
2776    String getOperationType() {
2777      return "ENABLE";
2778    }
2779  }
2780
2781  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2782
2783    DisableTableProcedureBiConsumer(TableName tableName) {
2784      super(tableName);
2785    }
2786
2787    @Override
2788    String getOperationType() {
2789      return "DISABLE";
2790    }
2791  }
2792
2793  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2794
2795    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2796      super(tableName);
2797    }
2798
2799    @Override
2800    String getOperationType() {
2801      return "ADD_COLUMN_FAMILY";
2802    }
2803  }
2804
2805  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2806
2807    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2808      super(tableName);
2809    }
2810
2811    @Override
2812    String getOperationType() {
2813      return "DELETE_COLUMN_FAMILY";
2814    }
2815  }
2816
2817  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2818
2819    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2820      super(tableName);
2821    }
2822
2823    @Override
2824    String getOperationType() {
2825      return "MODIFY_COLUMN_FAMILY";
2826    }
2827  }
2828
2829  private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer
2830    extends TableProcedureBiConsumer {
2831
2832    ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) {
2833      super(tableName);
2834    }
2835
2836    @Override
2837    String getOperationType() {
2838      return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER";
2839    }
2840  }
2841
2842  private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer {
2843
2844    FlushTableProcedureBiConsumer(TableName tableName) {
2845      super(tableName);
2846    }
2847
2848    @Override
2849    String getOperationType() {
2850      return "FLUSH";
2851    }
2852  }
2853
2854  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2855
2856    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2857      super(namespaceName);
2858    }
2859
2860    @Override
2861    String getOperationType() {
2862      return "CREATE_NAMESPACE";
2863    }
2864  }
2865
2866  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2867
2868    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2869      super(namespaceName);
2870    }
2871
2872    @Override
2873    String getOperationType() {
2874      return "DELETE_NAMESPACE";
2875    }
2876  }
2877
2878  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2879
2880    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2881      super(namespaceName);
2882    }
2883
2884    @Override
2885    String getOperationType() {
2886      return "MODIFY_NAMESPACE";
2887    }
2888  }
2889
2890  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2891
2892    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2893      super(tableName);
2894    }
2895
2896    @Override
2897    String getOperationType() {
2898      return "MERGE_REGIONS";
2899    }
2900  }
2901
2902  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2903
2904    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2905      super(tableName);
2906    }
2907
2908    @Override
2909    String getOperationType() {
2910      return "SPLIT_REGION";
2911    }
2912  }
2913
2914  private static class TruncateRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2915
2916    TruncateRegionProcedureBiConsumer(TableName tableName) {
2917      super(tableName);
2918    }
2919
2920    @Override
2921    String getOperationType() {
2922      return "TRUNCATE_REGION";
2923    }
2924  }
2925
2926  private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
2927    SnapshotProcedureBiConsumer(TableName tableName) {
2928      super(tableName);
2929    }
2930
2931    @Override
2932    String getOperationType() {
2933      return "SNAPSHOT";
2934    }
2935  }
2936
2937  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2938    private final String peerId;
2939    private final Supplier<String> getOperation;
2940
2941    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2942      this.peerId = peerId;
2943      this.getOperation = getOperation;
2944    }
2945
2946    String getDescription() {
2947      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2948    }
2949
2950    @Override
2951    void onFinished() {
2952      LOG.info(getDescription() + " completed");
2953    }
2954
2955    @Override
2956    void onError(Throwable error) {
2957      LOG.info(getDescription() + " failed with " + error.getMessage());
2958    }
2959  }
2960
2961  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2962    CompletableFuture<Void> future = new CompletableFuture<>();
2963    addListener(procFuture, (procId, error) -> {
2964      if (error != null) {
2965        future.completeExceptionally(error);
2966        return;
2967      }
2968      getProcedureResult(procId, future, 0);
2969    });
2970    return future;
2971  }
2972
2973  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2974    addListener(
2975      this.<GetProcedureResultResponse> newMasterCaller()
2976        .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse,
2977          GetProcedureResultResponse> call(controller, stub,
2978            GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2979            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2980        .call(),
2981      (response, error) -> {
2982        if (error != null) {
2983          LOG.warn("failed to get the procedure result procId={}", procId,
2984            ConnectionUtils.translateException(error));
2985          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2986            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2987          return;
2988        }
2989        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2990          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2991            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2992          return;
2993        }
2994        if (response.hasException()) {
2995          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2996          future.completeExceptionally(ioe);
2997        } else {
2998          future.complete(null);
2999        }
3000      });
3001  }
3002
3003  private <T> CompletableFuture<T> failedFuture(Throwable error) {
3004    CompletableFuture<T> future = new CompletableFuture<>();
3005    future.completeExceptionally(error);
3006    return future;
3007  }
3008
3009  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
3010    if (error != null) {
3011      future.completeExceptionally(error);
3012      return true;
3013    }
3014    return false;
3015  }
3016
3017  @Override
3018  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
3019    return getClusterMetrics(EnumSet.allOf(Option.class));
3020  }
3021
3022  @Override
3023  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
3024    return this.<ClusterMetrics> newMasterCaller()
3025      .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse,
3026        ClusterMetrics> call(controller, stub,
3027          RequestConverter.buildGetClusterStatusRequest(options),
3028          (s, c, req, done) -> s.getClusterStatus(c, req, done),
3029          resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus())))
3030      .call();
3031  }
3032
3033  @Override
3034  public CompletableFuture<Void> shutdown() {
3035    return this.<Void> newMasterCaller().priority(HIGH_QOS)
3036      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
3037        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
3038        resp -> null))
3039      .call();
3040  }
3041
3042  @Override
3043  public CompletableFuture<Void> stopMaster() {
3044    return this.<Void> newMasterCaller().priority(HIGH_QOS)
3045      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
3046        controller, stub, StopMasterRequest.newBuilder().build(),
3047        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
3048      .call();
3049  }
3050
3051  @Override
3052  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
3053    StopServerRequest request = RequestConverter
3054      .buildStopServerRequest("Called by admin client " + this.connection.toString());
3055    return this.<Void> newAdminCaller().priority(HIGH_QOS)
3056      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
3057        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
3058        resp -> null))
3059      .serverName(serverName).call();
3060  }
3061
3062  @Override
3063  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
3064    return this.<Void> newAdminCaller()
3065      .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse,
3066        Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
3067          (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
3068      .serverName(serverName).call();
3069  }
3070
3071  @Override
3072  public CompletableFuture<Void> updateConfiguration() {
3073    CompletableFuture<Void> future = new CompletableFuture<Void>();
3074    addListener(
3075      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
3076      (status, err) -> {
3077        if (err != null) {
3078          future.completeExceptionally(err);
3079        } else {
3080          List<CompletableFuture<Void>> futures = new ArrayList<>();
3081          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
3082          futures.add(updateConfiguration(status.getMasterName()));
3083          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
3084          addListener(
3085            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3086            (result, err2) -> {
3087              if (err2 != null) {
3088                future.completeExceptionally(err2);
3089              } else {
3090                future.complete(result);
3091              }
3092            });
3093        }
3094      });
3095    return future;
3096  }
3097
3098  @Override
3099  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
3100    return this.<Void> newAdminCaller()
3101      .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse,
3102        Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(),
3103          (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
3104      .serverName(serverName).call();
3105  }
3106
3107  @Override
3108  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
3109    return this.<Void> newAdminCaller()
3110      .action((controller, stub) -> this.<ClearCompactionQueuesRequest,
3111        ClearCompactionQueuesResponse, Void> adminCall(controller, stub,
3112          RequestConverter.buildClearCompactionQueuesRequest(queues),
3113          (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
3114      .serverName(serverName).call();
3115  }
3116
3117  @Override
3118  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
3119    return this.<List<SecurityCapability>> newMasterCaller()
3120      .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse,
3121        List<SecurityCapability>> call(controller, stub,
3122          SecurityCapabilitiesRequest.newBuilder().build(),
3123          (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
3124          (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
3125      .call();
3126  }
3127
3128  @Override
3129  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
3130    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
3131  }
3132
3133  @Override
3134  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
3135    TableName tableName) {
3136    Preconditions.checkNotNull(tableName,
3137      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
3138    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
3139  }
3140
3141  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
3142    ServerName serverName) {
3143    return this.<List<RegionMetrics>> newAdminCaller()
3144      .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse,
3145        List<RegionMetrics>> adminCall(controller, stub, request,
3146          (s, c, req, done) -> s.getRegionLoad(controller, req, done),
3147          RegionMetricsBuilder::toRegionMetrics))
3148      .serverName(serverName).call();
3149  }
3150
3151  @Override
3152  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
3153    return this.<Boolean> newMasterCaller()
3154      .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse,
3155        Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(),
3156          (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
3157          resp -> resp.getInMaintenanceMode()))
3158      .call();
3159  }
3160
3161  @Override
3162  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
3163    CompactType compactType) {
3164    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3165
3166    switch (compactType) {
3167      case MOB:
3168        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
3169          if (err != null) {
3170            future.completeExceptionally(err);
3171            return;
3172          }
3173          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
3174
3175          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
3176            .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3177              GetRegionInfoResponse> adminCall(controller, stub,
3178                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
3179                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3180            .call(), (resp2, err2) -> {
3181              if (err2 != null) {
3182                future.completeExceptionally(err2);
3183              } else {
3184                if (resp2.hasCompactionState()) {
3185                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3186                } else {
3187                  future.complete(CompactionState.NONE);
3188                }
3189              }
3190            });
3191        });
3192        break;
3193      case NORMAL:
3194        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3195          if (err != null) {
3196            future.completeExceptionally(err);
3197            return;
3198          }
3199          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
3200          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
3201          locations.stream().filter(loc -> loc.getServerName() != null)
3202            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
3203            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
3204              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
3205                // If any region compaction state is MAJOR_AND_MINOR
3206                // the table compaction state is MAJOR_AND_MINOR, too.
3207                if (err2 != null) {
3208                  future.completeExceptionally(unwrapCompletionException(err2));
3209                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3210                  future.complete(regionState);
3211                } else {
3212                  regionStates.add(regionState);
3213                }
3214              }));
3215            });
3216          addListener(
3217            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3218            (ret, err3) -> {
3219              // If future not completed, check all regions's compaction state
3220              if (!future.isCompletedExceptionally() && !future.isDone()) {
3221                CompactionState state = CompactionState.NONE;
3222                for (CompactionState regionState : regionStates) {
3223                  switch (regionState) {
3224                    case MAJOR:
3225                      if (state == CompactionState.MINOR) {
3226                        future.complete(CompactionState.MAJOR_AND_MINOR);
3227                      } else {
3228                        state = CompactionState.MAJOR;
3229                      }
3230                      break;
3231                    case MINOR:
3232                      if (state == CompactionState.MAJOR) {
3233                        future.complete(CompactionState.MAJOR_AND_MINOR);
3234                      } else {
3235                        state = CompactionState.MINOR;
3236                      }
3237                      break;
3238                    case NONE:
3239                    default:
3240                  }
3241                }
3242                if (!future.isDone()) {
3243                  future.complete(state);
3244                }
3245              }
3246            });
3247        });
3248        break;
3249      default:
3250        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3251    }
3252
3253    return future;
3254  }
3255
3256  @Override
3257  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3258    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3259    addListener(getRegionLocation(regionName), (location, err) -> {
3260      if (err != null) {
3261        future.completeExceptionally(err);
3262        return;
3263      }
3264      ServerName serverName = location.getServerName();
3265      if (serverName == null) {
3266        future
3267          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3268        return;
3269      }
3270      addListener(
3271        this.<GetRegionInfoResponse> newAdminCaller()
3272          .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3273            GetRegionInfoResponse> adminCall(controller, stub,
3274              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3275                true),
3276              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3277          .serverName(serverName).call(),
3278        (resp2, err2) -> {
3279          if (err2 != null) {
3280            future.completeExceptionally(err2);
3281          } else {
3282            if (resp2.hasCompactionState()) {
3283              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3284            } else {
3285              future.complete(CompactionState.NONE);
3286            }
3287          }
3288        });
3289    });
3290    return future;
3291  }
3292
3293  @Override
3294  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3295    MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder()
3296      .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3297    return this.<Optional<Long>> newMasterCaller()
3298      .action((controller, stub) -> this.<MajorCompactionTimestampRequest,
3299        MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request,
3300          (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
3301          ProtobufUtil::toOptionalTimestamp))
3302      .call();
3303  }
3304
3305  @Override
3306  public CompletableFuture<Optional<Long>>
3307    getLastMajorCompactionTimestampForRegion(byte[] regionName) {
3308    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3309    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3310    addListener(getRegionInfo(regionName), (region, err) -> {
3311      if (err != null) {
3312        future.completeExceptionally(err);
3313        return;
3314      }
3315      MajorCompactionTimestampForRegionRequest.Builder builder =
3316        MajorCompactionTimestampForRegionRequest.newBuilder();
3317      builder.setRegion(
3318        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3319      addListener(this.<Optional<Long>> newMasterCaller()
3320        .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest,
3321          MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(),
3322            (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3323            ProtobufUtil::toOptionalTimestamp))
3324        .call(), (timestamp, err2) -> {
3325          if (err2 != null) {
3326            future.completeExceptionally(err2);
3327          } else {
3328            future.complete(timestamp);
3329          }
3330        });
3331    });
3332    return future;
3333  }
3334
3335  @Override
3336  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3337    List<String> serverNamesList) {
3338    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3339    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3340      if (err != null) {
3341        future.completeExceptionally(err);
3342        return;
3343      }
3344      // Accessed by multiple threads.
3345      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3346      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3347      serverNames.stream().forEach(serverName -> {
3348        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3349          if (err2 != null) {
3350            future.completeExceptionally(unwrapCompletionException(err2));
3351          } else {
3352            serverStates.put(serverName, serverState);
3353          }
3354        }));
3355      });
3356      addListener(
3357        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3358        (ret, err3) -> {
3359          if (!future.isCompletedExceptionally()) {
3360            if (err3 != null) {
3361              future.completeExceptionally(err3);
3362            } else {
3363              future.complete(serverStates);
3364            }
3365          }
3366        });
3367    });
3368    return future;
3369  }
3370
3371  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3372    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3373    if (serverNamesList.isEmpty()) {
3374      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3375        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3376      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3377        if (err != null) {
3378          future.completeExceptionally(err);
3379        } else {
3380          future.complete(clusterMetrics.getServersName());
3381        }
3382      });
3383      return future;
3384    } else {
3385      List<ServerName> serverList = new ArrayList<>();
3386      for (String regionServerName : serverNamesList) {
3387        ServerName serverName = null;
3388        try {
3389          serverName = ServerName.valueOf(regionServerName);
3390        } catch (Exception e) {
3391          future.completeExceptionally(
3392            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3393        }
3394        if (serverName == null) {
3395          future.completeExceptionally(
3396            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3397        } else {
3398          serverList.add(serverName);
3399        }
3400      }
3401      future.complete(serverList);
3402    }
3403    return future;
3404  }
3405
3406  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3407    return this.<Boolean> newAdminCaller().serverName(serverName)
3408      .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3409        Boolean> adminCall(controller, stub,
3410          CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(),
3411          (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState()))
3412      .call();
3413  }
3414
3415  @Override
3416  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3417    return this.<Boolean> newMasterCaller()
3418      .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse,
3419        Boolean> call(controller, stub,
3420          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3421          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3422          (resp) -> resp.getPrevBalanceValue()))
3423      .call();
3424  }
3425
3426  @Override
3427  public CompletableFuture<BalanceResponse> balance(BalanceRequest request) {
3428    return this.<BalanceResponse> newMasterCaller()
3429      .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse,
3430        BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request),
3431          (s, c, req, done) -> s.balance(c, req, done),
3432          (resp) -> ProtobufUtil.toBalanceResponse(resp)))
3433      .call();
3434  }
3435
3436  @Override
3437  public CompletableFuture<Boolean> isBalancerEnabled() {
3438    return this.<Boolean> newMasterCaller()
3439      .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse,
3440        Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3441          (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3442      .call();
3443  }
3444
3445  @Override
3446  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3447    return this.<Boolean> newMasterCaller()
3448      .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse,
3449        Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on),
3450          (s, c, req, done) -> s.setNormalizerRunning(c, req, done),
3451          (resp) -> resp.getPrevNormalizerValue()))
3452      .call();
3453  }
3454
3455  @Override
3456  public CompletableFuture<Boolean> isNormalizerEnabled() {
3457    return this.<Boolean> newMasterCaller()
3458      .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse,
3459        Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3460          (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3461      .call();
3462  }
3463
3464  @Override
3465  public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) {
3466    return normalize(RequestConverter.buildNormalizeRequest(ntfp));
3467  }
3468
3469  private CompletableFuture<Boolean> normalize(NormalizeRequest request) {
3470    return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub,
3471      request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call();
3472  }
3473
3474  @Override
3475  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3476    return this.<Boolean> newMasterCaller()
3477      .action((controller, stub) -> this.<SetCleanerChoreRunningRequest,
3478        SetCleanerChoreRunningResponse, Boolean> call(controller, stub,
3479          RequestConverter.buildSetCleanerChoreRunningRequest(enabled),
3480          (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done),
3481          (resp) -> resp.getPrevValue()))
3482      .call();
3483  }
3484
3485  @Override
3486  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3487    return this.<Boolean> newMasterCaller()
3488      .action(
3489        (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse,
3490          Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(),
3491            (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3492      .call();
3493  }
3494
3495  @Override
3496  public CompletableFuture<Boolean> runCleanerChore() {
3497    return this.<Boolean> newMasterCaller()
3498      .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse,
3499        Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(),
3500          (s, c, req, done) -> s.runCleanerChore(c, req, done),
3501          (resp) -> resp.getCleanerChoreRan()))
3502      .call();
3503  }
3504
3505  @Override
3506  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3507    return this.<Boolean> newMasterCaller()
3508      .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse,
3509        Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled),
3510          (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue()))
3511      .call();
3512  }
3513
3514  @Override
3515  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3516    return this.<Boolean> newMasterCaller()
3517      .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest,
3518        IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub,
3519          RequestConverter.buildIsCatalogJanitorEnabledRequest(),
3520          (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue()))
3521      .call();
3522  }
3523
3524  @Override
3525  public CompletableFuture<Integer> runCatalogJanitor() {
3526    return this.<Integer> newMasterCaller()
3527      .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse,
3528        Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(),
3529          (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3530      .call();
3531  }
3532
3533  @Override
3534  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3535    ServiceCaller<S, R> callable) {
3536    MasterCoprocessorRpcChannelImpl channel =
3537      new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3538    S stub = stubMaker.apply(channel);
3539    CompletableFuture<R> future = new CompletableFuture<>();
3540    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3541    callable.call(stub, controller, resp -> {
3542      if (controller.failed()) {
3543        future.completeExceptionally(controller.getFailed());
3544      } else {
3545        future.complete(resp);
3546      }
3547    });
3548    return future;
3549  }
3550
3551  @Override
3552  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3553    ServiceCaller<S, R> callable, ServerName serverName) {
3554    RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl(
3555      this.<Message> newServerCaller().serverName(serverName));
3556    S stub = stubMaker.apply(channel);
3557    CompletableFuture<R> future = new CompletableFuture<>();
3558    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3559    callable.call(stub, controller, resp -> {
3560      if (controller.failed()) {
3561        future.completeExceptionally(controller.getFailed());
3562      } else {
3563        future.complete(resp);
3564      }
3565    });
3566    return future;
3567  }
3568
3569  @Override
3570  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3571    return this.<List<ServerName>> newMasterCaller()
3572      .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse,
3573        List<ServerName>> call(controller, stub,
3574          RequestConverter.buildClearDeadServersRequest(servers),
3575          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3576          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3577      .call();
3578  }
3579
3580  private <T> ServerRequestCallerBuilder<T> newServerCaller() {
3581    return this.connection.callerFactory.<T> serverRequest()
3582      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3583      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3584      .pause(pauseNs, TimeUnit.NANOSECONDS)
3585      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
3586      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3587  }
3588
3589  @Override
3590  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3591    if (tableName == null) {
3592      return failedFuture(new IllegalArgumentException("Table name is null"));
3593    }
3594    CompletableFuture<Void> future = new CompletableFuture<>();
3595    addListener(tableExists(tableName), (exist, err) -> {
3596      if (err != null) {
3597        future.completeExceptionally(err);
3598        return;
3599      }
3600      if (!exist) {
3601        future.completeExceptionally(new TableNotFoundException(
3602          "Table '" + tableName.getNameAsString() + "' does not exists."));
3603        return;
3604      }
3605      addListener(getTableSplits(tableName), (splits, err1) -> {
3606        if (err1 != null) {
3607          future.completeExceptionally(err1);
3608        } else {
3609          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3610            if (err2 != null) {
3611              future.completeExceptionally(err2);
3612            } else {
3613              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3614                if (err3 != null) {
3615                  future.completeExceptionally(err3);
3616                } else {
3617                  future.complete(result3);
3618                }
3619              });
3620            }
3621          });
3622        }
3623      });
3624    });
3625    return future;
3626  }
3627
3628  @Override
3629  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3630    if (tableName == null) {
3631      return failedFuture(new IllegalArgumentException("Table name is null"));
3632    }
3633    CompletableFuture<Void> future = new CompletableFuture<>();
3634    addListener(tableExists(tableName), (exist, err) -> {
3635      if (err != null) {
3636        future.completeExceptionally(err);
3637        return;
3638      }
3639      if (!exist) {
3640        future.completeExceptionally(new TableNotFoundException(
3641          "Table '" + tableName.getNameAsString() + "' does not exists."));
3642        return;
3643      }
3644      addListener(setTableReplication(tableName, false), (result, err2) -> {
3645        if (err2 != null) {
3646          future.completeExceptionally(err2);
3647        } else {
3648          future.complete(result);
3649        }
3650      });
3651    });
3652    return future;
3653  }
3654
3655  @Override
3656  public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
3657    GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder();
3658    request.setPeerId(peerId);
3659    return this.<Boolean> newMasterCaller()
3660      .action((controller, stub) -> this.<GetReplicationPeerStateRequest,
3661        GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(),
3662          (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done),
3663          resp -> resp.getIsEnabled()))
3664      .call();
3665  }
3666
3667  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3668    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3669    addListener(
3670      getRegions(tableName).thenApply(regions -> regions.stream()
3671        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3672      (regions, err2) -> {
3673        if (err2 != null) {
3674          future.completeExceptionally(err2);
3675          return;
3676        }
3677        if (regions.size() == 1) {
3678          future.complete(null);
3679        } else {
3680          byte[][] splits = new byte[regions.size() - 1][];
3681          for (int i = 1; i < regions.size(); i++) {
3682            splits[i - 1] = regions.get(i).getStartKey();
3683          }
3684          future.complete(splits);
3685        }
3686      });
3687    return future;
3688  }
3689
3690  /**
3691   * Connect to peer and check the table descriptor on peer:
3692   * <ol>
3693   * <li>Create the same table on peer when not exist.</li>
3694   * <li>Throw an exception if the table already has replication enabled on any of the column
3695   * families.</li>
3696   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3697   * </ol>
3698   * @param tableName name of the table to sync to the peer
3699   * @param splits    table split keys
3700   */
3701  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3702    byte[][] splits) {
3703    CompletableFuture<Void> future = new CompletableFuture<>();
3704    addListener(listReplicationPeers(), (peers, err) -> {
3705      if (err != null) {
3706        future.completeExceptionally(err);
3707        return;
3708      }
3709      if (peers == null || peers.size() <= 0) {
3710        future.completeExceptionally(
3711          new IllegalArgumentException("Found no peer cluster for replication."));
3712        return;
3713      }
3714      List<CompletableFuture<Void>> futures = new ArrayList<>();
3715      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3716        .forEach(peer -> {
3717          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3718        });
3719      addListener(
3720        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3721        (result, err2) -> {
3722          if (err2 != null) {
3723            future.completeExceptionally(err2);
3724          } else {
3725            future.complete(result);
3726          }
3727        });
3728    });
3729    return future;
3730  }
3731
3732  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3733    ReplicationPeerDescription peer) {
3734    Configuration peerConf = null;
3735    try {
3736      peerConf =
3737        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3738    } catch (IOException e) {
3739      return failedFuture(e);
3740    }
3741    CompletableFuture<Void> future = new CompletableFuture<>();
3742    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3743      if (err != null) {
3744        future.completeExceptionally(err);
3745        return;
3746      }
3747      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3748        if (err1 != null) {
3749          future.completeExceptionally(err1);
3750          return;
3751        }
3752        AsyncAdmin peerAdmin = conn.getAdmin();
3753        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3754          if (err2 != null) {
3755            future.completeExceptionally(err2);
3756            return;
3757          }
3758          if (!exist) {
3759            CompletableFuture<Void> createTableFuture = null;
3760            if (splits == null) {
3761              createTableFuture = peerAdmin.createTable(tableDesc);
3762            } else {
3763              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3764            }
3765            addListener(createTableFuture, (result, err3) -> {
3766              if (err3 != null) {
3767                future.completeExceptionally(err3);
3768              } else {
3769                future.complete(result);
3770              }
3771            });
3772          } else {
3773            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3774              (result, err4) -> {
3775                if (err4 != null) {
3776                  future.completeExceptionally(err4);
3777                } else {
3778                  future.complete(result);
3779                }
3780              });
3781          }
3782        });
3783      });
3784    });
3785    return future;
3786  }
3787
3788  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3789    TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3790    CompletableFuture<Void> future = new CompletableFuture<>();
3791    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3792      if (err != null) {
3793        future.completeExceptionally(err);
3794        return;
3795      }
3796      if (peerTableDesc == null) {
3797        future.completeExceptionally(
3798          new IllegalArgumentException("Failed to get table descriptor for table "
3799            + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3800        return;
3801      }
3802      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3803        future.completeExceptionally(new IllegalArgumentException(
3804          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId()
3805            + ", but the table descriptors are not same when compared with source cluster."
3806            + " Thus can not enable the table's replication switch."));
3807        return;
3808      }
3809      future.complete(null);
3810    });
3811    return future;
3812  }
3813
3814  /**
3815   * Set the table's replication switch if the table's replication switch is already not set.
3816   * @param tableName name of the table
3817   * @param enableRep is replication switch enable or disable
3818   */
3819  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3820    CompletableFuture<Void> future = new CompletableFuture<>();
3821    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3822      if (err != null) {
3823        future.completeExceptionally(err);
3824        return;
3825      }
3826      if (!tableDesc.matchReplicationScope(enableRep)) {
3827        int scope =
3828          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3829        TableDescriptor newTableDesc =
3830          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3831        addListener(modifyTable(newTableDesc), (result, err2) -> {
3832          if (err2 != null) {
3833            future.completeExceptionally(err2);
3834          } else {
3835            future.complete(result);
3836          }
3837        });
3838      } else {
3839        future.complete(null);
3840      }
3841    });
3842    return future;
3843  }
3844
3845  private void waitUntilAllReplicationPeerModificationProceduresDone(
3846    CompletableFuture<Boolean> future, boolean prevOn, int retries) {
3847    CompletableFuture<List<ProcedureProtos.Procedure>> callFuture =
3848      this.<List<ProcedureProtos.Procedure>> newMasterCaller()
3849        .action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest,
3850          GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call(
3851            controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(),
3852            (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done),
3853            resp -> resp.getProcedureList()))
3854        .call();
3855    addListener(callFuture, (r, e) -> {
3856      if (e != null) {
3857        future.completeExceptionally(e);
3858      } else if (r.isEmpty()) {
3859        // we are done
3860        future.complete(prevOn);
3861      } else {
3862        // retry later to see if the procedures are done
3863        retryTimer.newTimeout(
3864          t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1),
3865          ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3866      }
3867    });
3868  }
3869
3870  @Override
3871  public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
3872    boolean drainProcedures) {
3873    ReplicationPeerModificationSwitchRequest request =
3874      ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build();
3875    CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller()
3876      .action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest,
3877        ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request,
3878          (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done),
3879          resp -> resp.getPreviousValue()))
3880      .call();
3881    // if we do not need to wait all previous peer modification procedure done, or we are enabling
3882    // peer modification, just return here.
3883    if (!drainProcedures || on) {
3884      return callFuture;
3885    }
3886    // otherwise we need to wait until all previous peer modification procedure done
3887    CompletableFuture<Boolean> future = new CompletableFuture<>();
3888    addListener(callFuture, (prevOn, err) -> {
3889      if (err != null) {
3890        future.completeExceptionally(err);
3891        return;
3892      }
3893      // even if the previous state is disabled, we still need to wait here, as there could be
3894      // another client thread which called this method just before us and have already changed the
3895      // state to off, but there are still peer modification procedures not finished, so we should
3896      // also wait here.
3897      waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0);
3898    });
3899    return future;
3900  }
3901
3902  @Override
3903  public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
3904    return this.<Boolean> newMasterCaller()
3905      .action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest,
3906        IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub,
3907          IsReplicationPeerModificationEnabledRequest.getDefaultInstance(),
3908          (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done),
3909          (resp) -> resp.getEnabled()))
3910      .call();
3911  }
3912
3913  @Override
3914  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3915    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3916    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3917      if (err != null) {
3918        future.completeExceptionally(err);
3919        return;
3920      }
3921      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3922        locations.stream().filter(l -> l.getRegion() != null)
3923          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3924          .collect(Collectors.groupingBy(l -> l.getServerName(),
3925            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3926      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3927      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3928      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3929        futures
3930          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3931            if (err2 != null) {
3932              future.completeExceptionally(unwrapCompletionException(err2));
3933            } else {
3934              aggregator.append(stats);
3935            }
3936          }));
3937      }
3938      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3939        (ret, err3) -> {
3940          if (err3 != null) {
3941            future.completeExceptionally(unwrapCompletionException(err3));
3942          } else {
3943            future.complete(aggregator.sum());
3944          }
3945        });
3946    });
3947    return future;
3948  }
3949
3950  @Override
3951  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3952    boolean preserveSplits) {
3953    CompletableFuture<Void> future = new CompletableFuture<>();
3954    addListener(tableExists(tableName), (exist, err) -> {
3955      if (err != null) {
3956        future.completeExceptionally(err);
3957        return;
3958      }
3959      if (!exist) {
3960        future.completeExceptionally(new TableNotFoundException(tableName));
3961        return;
3962      }
3963      addListener(tableExists(newTableName), (exist1, err1) -> {
3964        if (err1 != null) {
3965          future.completeExceptionally(err1);
3966          return;
3967        }
3968        if (exist1) {
3969          future.completeExceptionally(new TableExistsException(newTableName));
3970          return;
3971        }
3972        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3973          if (err2 != null) {
3974            future.completeExceptionally(err2);
3975            return;
3976          }
3977          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3978          if (preserveSplits) {
3979            addListener(getTableSplits(tableName), (splits, err3) -> {
3980              if (err3 != null) {
3981                future.completeExceptionally(err3);
3982              } else {
3983                addListener(
3984                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
3985                  (result, err4) -> {
3986                    if (err4 != null) {
3987                      future.completeExceptionally(err4);
3988                    } else {
3989                      future.complete(result);
3990                    }
3991                  });
3992              }
3993            });
3994          } else {
3995            addListener(createTable(newTableDesc), (result, err5) -> {
3996              if (err5 != null) {
3997                future.completeExceptionally(err5);
3998              } else {
3999                future.complete(result);
4000              }
4001            });
4002          }
4003        });
4004      });
4005    });
4006    return future;
4007  }
4008
4009  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
4010    List<RegionInfo> hris) {
4011    return this.<CacheEvictionStats> newAdminCaller()
4012      .action((controller, stub) -> this.<ClearRegionBlockCacheRequest,
4013        ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub,
4014          RequestConverter.buildClearRegionBlockCacheRequest(hris),
4015          (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
4016          resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
4017      .serverName(serverName).call();
4018  }
4019
4020  @Override
4021  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
4022    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4023      .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse,
4024        Boolean> call(controller, stub,
4025          SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
4026          (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
4027          resp -> resp.getPreviousRpcThrottleEnabled()))
4028      .call();
4029    return future;
4030  }
4031
4032  @Override
4033  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
4034    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4035      .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse,
4036        Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
4037          (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
4038          resp -> resp.getRpcThrottleEnabled()))
4039      .call();
4040    return future;
4041  }
4042
4043  @Override
4044  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
4045    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4046      .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest,
4047        SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub,
4048          SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
4049            .build(),
4050          (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
4051          resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
4052      .call();
4053    return future;
4054  }
4055
4056  @Override
4057  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
4058    return this.<Map<TableName, Long>> newMasterCaller()
4059      .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest,
4060        GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub,
4061          RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
4062          (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
4063          resp -> resp.getSizesList().stream().collect(Collectors
4064            .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
4065      .call();
4066  }
4067
4068  @Override
4069  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>>
4070    getRegionServerSpaceQuotaSnapshots(ServerName serverName) {
4071    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
4072      .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest,
4073        GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller,
4074          stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
4075          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
4076          resp -> resp.getSnapshotsList().stream()
4077            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
4078              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
4079      .serverName(serverName).call();
4080  }
4081
4082  private CompletableFuture<SpaceQuotaSnapshot>
4083    getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
4084    return this.<SpaceQuotaSnapshot> newMasterCaller()
4085      .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse,
4086        SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(),
4087          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
4088      .call();
4089  }
4090
4091  @Override
4092  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
4093    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
4094      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
4095      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4096  }
4097
4098  @Override
4099  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
4100    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
4101    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
4102      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
4103      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4104  }
4105
4106  @Override
4107  public CompletableFuture<Void> grant(UserPermission userPermission,
4108    boolean mergeExistingPermissions) {
4109    return this.<Void> newMasterCaller()
4110      .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub,
4111        ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
4112        (s, c, req, done) -> s.grant(c, req, done), resp -> null))
4113      .call();
4114  }
4115
4116  @Override
4117  public CompletableFuture<Void> revoke(UserPermission userPermission) {
4118    return this.<Void> newMasterCaller()
4119      .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
4120        stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
4121        (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
4122      .call();
4123  }
4124
4125  @Override
4126  public CompletableFuture<List<UserPermission>>
4127    getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
4128    return this.<List<UserPermission>> newMasterCaller()
4129      .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest,
4130        GetUserPermissionsResponse, List<UserPermission>> call(controller, stub,
4131          ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
4132          (s, c, req, done) -> s.getUserPermissions(c, req, done),
4133          resp -> resp.getUserPermissionList().stream()
4134            .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
4135            .collect(Collectors.toList())))
4136      .call();
4137  }
4138
4139  @Override
4140  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
4141    List<Permission> permissions) {
4142    return this.<List<Boolean>> newMasterCaller()
4143      .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse,
4144        List<Boolean>> call(controller, stub,
4145          ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
4146          (s, c, req, done) -> s.hasUserPermissions(c, req, done),
4147          resp -> resp.getHasUserPermissionList()))
4148      .call();
4149  }
4150
4151  @Override
4152  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) {
4153    return this.<Boolean> newMasterCaller()
4154      .action((controller, stub) -> this.call(controller, stub,
4155        RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
4156        MasterService.Interface::switchSnapshotCleanup,
4157        SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
4158      .call();
4159  }
4160
4161  @Override
4162  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
4163    return this.<Boolean> newMasterCaller()
4164      .action((controller, stub) -> this.call(controller, stub,
4165        RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
4166        MasterService.Interface::isSnapshotCleanupEnabled,
4167        IsSnapshotCleanupEnabledResponse::getEnabled))
4168      .call();
4169  }
4170
4171  private CompletableFuture<List<LogEntry>> getSlowLogResponses(
4172    final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
4173    final String logType) {
4174    if (CollectionUtils.isEmpty(serverNames)) {
4175      return CompletableFuture.completedFuture(Collections.emptyList());
4176    }
4177    return CompletableFuture.supplyAsync(() -> serverNames
4178      .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName,
4179        filterParams, limit, logType))
4180      .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList()));
4181  }
4182
4183  private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
4184    Map<String, Object> filterParams, int limit, String logType) {
4185    return this.<List<LogEntry>> newAdminCaller()
4186      .action((controller, stub) -> this.adminCall(controller, stub,
4187        RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
4188        AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
4189      .serverName(serverName).call();
4190  }
4191
4192  @Override
4193  public CompletableFuture<List<Boolean>>
4194    clearSlowLogResponses(@Nullable Set<ServerName> serverNames) {
4195    if (CollectionUtils.isEmpty(serverNames)) {
4196      return CompletableFuture.completedFuture(Collections.emptyList());
4197    }
4198    List<CompletableFuture<Boolean>> clearSlowLogResponseList =
4199      serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList());
4200    return convertToFutureOfList(clearSlowLogResponseList);
4201  }
4202
4203  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
4204    return this.<Boolean> newAdminCaller()
4205      .action((controller, stub) -> this.adminCall(controller, stub,
4206        RequestConverter.buildClearSlowLogResponseRequest(),
4207        AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload))
4208      .serverName(serverName).call();
4209  }
4210
4211  private static <T> CompletableFuture<List<T>>
4212    convertToFutureOfList(List<CompletableFuture<T>> futures) {
4213    CompletableFuture<Void> allDoneFuture =
4214      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
4215    return allDoneFuture
4216      .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
4217  }
4218
4219  private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
4220    return this.<List<LogEntry>> newMasterCaller()
4221      .action((controller, stub) -> this.call(controller, stub,
4222        ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries,
4223        ProtobufUtil::toBalancerDecisionResponse))
4224      .call();
4225  }
4226
4227  private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) {
4228    return this.<List<LogEntry>> newMasterCaller()
4229      .action((controller, stub) -> this.call(controller, stub,
4230        ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries,
4231        ProtobufUtil::toBalancerRejectionResponse))
4232      .call();
4233  }
4234
4235  @Override
4236  public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
4237    String logType, ServerType serverType, int limit, Map<String, Object> filterParams) {
4238    if (logType == null || serverType == null) {
4239      throw new IllegalArgumentException("logType and/or serverType cannot be empty");
4240    }
4241    switch (logType) {
4242      case "SLOW_LOG":
4243      case "LARGE_LOG":
4244        if (ServerType.MASTER.equals(serverType)) {
4245          throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
4246        }
4247        return getSlowLogResponses(filterParams, serverNames, limit, logType);
4248      case "BALANCER_DECISION":
4249        if (ServerType.REGION_SERVER.equals(serverType)) {
4250          throw new IllegalArgumentException(
4251            "Balancer Decision logs are not maintained by HRegionServer");
4252        }
4253        return getBalancerDecisions(limit);
4254      case "BALANCER_REJECTION":
4255        if (ServerType.REGION_SERVER.equals(serverType)) {
4256          throw new IllegalArgumentException(
4257            "Balancer Rejection logs are not maintained by HRegionServer");
4258        }
4259        return getBalancerRejections(limit);
4260      default:
4261        return CompletableFuture.completedFuture(Collections.emptyList());
4262    }
4263  }
4264
4265  @Override
4266  public CompletableFuture<Void> flushMasterStore() {
4267    FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder();
4268    return this.<Void> newMasterCaller()
4269      .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse,
4270        Void> call(controller, stub, request.build(),
4271          (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null))
4272      .call();
4273  }
4274
4275  @Override
4276  public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) {
4277    GetCachedFilesListRequest.Builder request = GetCachedFilesListRequest.newBuilder();
4278    return this.<List<String>> newAdminCaller()
4279      .action((controller, stub) -> this.<GetCachedFilesListRequest, GetCachedFilesListResponse,
4280        List<String>> adminCall(controller, stub, request.build(),
4281          (s, c, req, done) -> s.getCachedFilesList(c, req, done),
4282          resp -> resp.getCachedFilesList()))
4283      .serverName(serverName).call();
4284  }
4285}