001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; 021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 024 025import com.google.protobuf.Message; 026import com.google.protobuf.RpcChannel; 027import edu.umd.cs.findbugs.annotations.Nullable; 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collections; 032import java.util.EnumSet; 033import java.util.HashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Optional; 037import java.util.Set; 038import java.util.concurrent.CompletableFuture; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ConcurrentLinkedQueue; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicReference; 043import java.util.function.BiConsumer; 044import java.util.function.Consumer; 045import java.util.function.Function; 046import java.util.function.Supplier; 047import java.util.regex.Pattern; 048import java.util.stream.Collectors; 049import java.util.stream.Stream; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.hbase.AsyncMetaTableAccessor; 052import org.apache.hadoop.hbase.CacheEvictionStats; 053import org.apache.hadoop.hbase.CacheEvictionStatsAggregator; 054import org.apache.hadoop.hbase.ClusterMetrics; 055import org.apache.hadoop.hbase.ClusterMetrics.Option; 056import org.apache.hadoop.hbase.ClusterMetricsBuilder; 057import org.apache.hadoop.hbase.DoNotRetryIOException; 058import org.apache.hadoop.hbase.HConstants; 059import org.apache.hadoop.hbase.HRegionLocation; 060import org.apache.hadoop.hbase.MetaTableAccessor; 061import org.apache.hadoop.hbase.MetaTableAccessor.QueryType; 062import org.apache.hadoop.hbase.NamespaceDescriptor; 063import org.apache.hadoop.hbase.RegionLocations; 064import org.apache.hadoop.hbase.RegionMetrics; 065import org.apache.hadoop.hbase.RegionMetricsBuilder; 066import org.apache.hadoop.hbase.ServerName; 067import org.apache.hadoop.hbase.TableExistsException; 068import org.apache.hadoop.hbase.TableName; 069import org.apache.hadoop.hbase.TableNotDisabledException; 070import org.apache.hadoop.hbase.TableNotEnabledException; 071import org.apache.hadoop.hbase.TableNotFoundException; 072import org.apache.hadoop.hbase.UnknownRegionException; 073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; 074import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; 075import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; 076import org.apache.hadoop.hbase.client.Scan.ReadType; 077import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 078import org.apache.hadoop.hbase.client.replication.TableCFs; 079import org.apache.hadoop.hbase.client.security.SecurityCapability; 080import org.apache.hadoop.hbase.exceptions.DeserializationException; 081import org.apache.hadoop.hbase.ipc.HBaseRpcController; 082import org.apache.hadoop.hbase.quotas.QuotaFilter; 083import org.apache.hadoop.hbase.quotas.QuotaSettings; 084import org.apache.hadoop.hbase.quotas.QuotaTableUtil; 085import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 086import org.apache.hadoop.hbase.replication.ReplicationException; 087import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 088import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 089import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest; 090import org.apache.hadoop.hbase.security.access.Permission; 091import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil; 092import org.apache.hadoop.hbase.security.access.UserPermission; 093import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 094import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; 095import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 096import org.apache.hadoop.hbase.util.Bytes; 097import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 098import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 099import org.apache.hadoop.hbase.util.Strings; 100import org.apache.yetus.audience.InterfaceAudience; 101import org.slf4j.Logger; 102import org.slf4j.LoggerFactory; 103 104import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 105import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 106import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 107import org.apache.hbase.thirdparty.io.netty.util.Timeout; 108import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 109import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 110 111import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 112import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 113import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; 114import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; 115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest; 116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse; 117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 146import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 147import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 148import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 149import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; 150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; 152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; 153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; 154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; 155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; 260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; 261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; 262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; 263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse; 264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; 267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; 268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; 269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse; 270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse; 275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; 276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse; 277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; 278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; 279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; 280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; 281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; 282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; 283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; 284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; 285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; 290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; 291import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; 292import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; 293import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 294import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 295import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 296import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 297import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 298import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes; 299import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 300import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 301import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 302import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 303import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 304import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 305import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 306import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 307import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 308import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 309import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest; 310import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse; 311import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; 312import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; 313import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest; 314import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse; 315import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 316import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 317import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 318import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 319import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest; 320import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse; 321import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 322import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 323import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 324 325/** 326 * The implementation of AsyncAdmin. 327 * <p> 328 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 329 * be finished inside the rpc framework thread, which means that the callbacks registered to the 330 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 331 * this class should not try to do time consuming tasks in the callbacks. 332 * @since 2.0.0 333 * @see AsyncHBaseAdmin 334 * @see AsyncConnection#getAdmin() 335 * @see AsyncConnection#getAdminBuilder() 336 */ 337@InterfaceAudience.Private 338class RawAsyncHBaseAdmin implements AsyncAdmin { 339 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; 340 341 private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class); 342 343 private final AsyncConnectionImpl connection; 344 345 private final HashedWheelTimer retryTimer; 346 347 private final AsyncTable<AdvancedScanResultConsumer> metaTable; 348 349 private final long rpcTimeoutNs; 350 351 private final long operationTimeoutNs; 352 353 private final long pauseNs; 354 355 private final long pauseNsForServerOverloaded; 356 357 private final int maxAttempts; 358 359 private final int startLogErrorsCnt; 360 361 private final NonceGenerator ng; 362 363 RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer, 364 AsyncAdminBuilderBase builder) { 365 this.connection = connection; 366 this.retryTimer = retryTimer; 367 this.metaTable = connection.getTable(META_TABLE_NAME); 368 this.rpcTimeoutNs = builder.rpcTimeoutNs; 369 this.operationTimeoutNs = builder.operationTimeoutNs; 370 this.pauseNs = builder.pauseNs; 371 if (builder.pauseNsForServerOverloaded < builder.pauseNs) { 372 LOG.warn( 373 "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" 374 + " the normal pause value {} ms, use the greater one instead", 375 TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), 376 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 377 this.pauseNsForServerOverloaded = builder.pauseNs; 378 } else { 379 this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; 380 } 381 this.maxAttempts = builder.maxAttempts; 382 this.startLogErrorsCnt = builder.startLogErrorsCnt; 383 this.ng = connection.getNonceGenerator(); 384 } 385 386 private <T> MasterRequestCallerBuilder<T> newMasterCaller() { 387 return this.connection.callerFactory.<T> masterRequest() 388 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 389 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 390 .pause(pauseNs, TimeUnit.NANOSECONDS) 391 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 392 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 393 } 394 395 private <T> AdminRequestCallerBuilder<T> newAdminCaller() { 396 return this.connection.callerFactory.<T> adminRequest() 397 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 398 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 399 .pause(pauseNs, TimeUnit.NANOSECONDS) 400 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 401 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 402 } 403 404 @FunctionalInterface 405 private interface MasterRpcCall<RESP, REQ> { 406 void call(MasterService.Interface stub, HBaseRpcController controller, REQ req, 407 RpcCallback<RESP> done); 408 } 409 410 @FunctionalInterface 411 private interface AdminRpcCall<RESP, REQ> { 412 void call(AdminService.Interface stub, HBaseRpcController controller, REQ req, 413 RpcCallback<RESP> done); 414 } 415 416 @FunctionalInterface 417 private interface Converter<D, S> { 418 D convert(S src) throws IOException; 419 } 420 421 private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 422 MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, 423 Converter<RESP, PRESP> respConverter) { 424 CompletableFuture<RESP> future = new CompletableFuture<>(); 425 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 426 427 @Override 428 public void run(PRESP resp) { 429 if (controller.failed()) { 430 future.completeExceptionally(controller.getFailed()); 431 } else { 432 try { 433 future.complete(respConverter.convert(resp)); 434 } catch (IOException e) { 435 future.completeExceptionally(e); 436 } 437 } 438 } 439 }); 440 return future; 441 } 442 443 private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller, 444 AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall, 445 Converter<RESP, PRESP> respConverter) { 446 CompletableFuture<RESP> future = new CompletableFuture<>(); 447 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 448 449 @Override 450 public void run(PRESP resp) { 451 if (controller.failed()) { 452 future.completeExceptionally(controller.getFailed()); 453 } else { 454 try { 455 future.complete(respConverter.convert(resp)); 456 } catch (IOException e) { 457 future.completeExceptionally(e); 458 } 459 } 460 } 461 }); 462 return future; 463 } 464 465 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, 466 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 467 ProcedureBiConsumer consumer) { 468 return procedureCall(b -> { 469 }, preq, rpcCall, respConverter, consumer); 470 } 471 472 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq, 473 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 474 ProcedureBiConsumer consumer) { 475 return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer); 476 } 477 478 private <PREQ, PRESP> CompletableFuture<Void> procedureCall( 479 Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq, 480 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 481 ProcedureBiConsumer consumer) { 482 MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller, 483 stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)); 484 prioritySetter.accept(builder); 485 CompletableFuture<Long> procFuture = builder.call(); 486 CompletableFuture<Void> future = waitProcedureResult(procFuture); 487 addListener(future, consumer); 488 return future; 489 } 490 491 @Override 492 public CompletableFuture<Boolean> tableExists(TableName tableName) { 493 if (TableName.isMetaTableName(tableName)) { 494 return CompletableFuture.completedFuture(true); 495 } 496 return AsyncMetaTableAccessor.tableExists(metaTable, tableName); 497 } 498 499 @Override 500 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) { 501 return getTableDescriptors( 502 RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables)); 503 } 504 505 /** 506 * {@link #listTableDescriptors(boolean)} 507 */ 508 @Override 509 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern, 510 boolean includeSysTables) { 511 Preconditions.checkNotNull(pattern, 512 "pattern is null. If you don't specify a pattern, use listTables(boolean) instead"); 513 return getTableDescriptors( 514 RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables)); 515 } 516 517 @Override 518 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) { 519 Preconditions.checkNotNull(tableNames, 520 "tableNames is null. If you don't specify tableNames, " + "use listTables(boolean) instead"); 521 if (tableNames.isEmpty()) { 522 return CompletableFuture.completedFuture(Collections.emptyList()); 523 } 524 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames)); 525 } 526 527 private CompletableFuture<List<TableDescriptor>> 528 getTableDescriptors(GetTableDescriptorsRequest request) { 529 return this.<List<TableDescriptor>> newMasterCaller() 530 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 531 List<TableDescriptor>> call(controller, stub, request, 532 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 533 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 534 .call(); 535 } 536 537 @Override 538 public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) { 539 return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables)); 540 } 541 542 @Override 543 public CompletableFuture<List<TableName>> listTableNames(Pattern pattern, 544 boolean includeSysTables) { 545 Preconditions.checkNotNull(pattern, 546 "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead"); 547 return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables)); 548 } 549 550 private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) { 551 return this.<List<TableName>> newMasterCaller() 552 .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse, 553 List<TableName>> call(controller, stub, request, 554 (s, c, req, done) -> s.getTableNames(c, req, done), 555 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 556 .call(); 557 } 558 559 @Override 560 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) { 561 return this.<List<TableDescriptor>> newMasterCaller() 562 .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest, 563 ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub, 564 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 565 (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done), 566 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 567 .call(); 568 } 569 570 @Override 571 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) { 572 return this.<List<TableDescriptor>> newMasterCaller() 573 .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest, 574 ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub, 575 ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(), 576 (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done), 577 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 578 .call(); 579 } 580 581 @Override 582 public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) { 583 return this.<List<TableName>> newMasterCaller() 584 .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest, 585 ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub, 586 ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 587 (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done), 588 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList()))) 589 .call(); 590 } 591 592 @Override 593 public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) { 594 return this.<List<TableName>> newMasterCaller() 595 .action((controller, stub) -> this.<ListTableNamesByStateRequest, 596 ListTableNamesByStateResponse, List<TableName>> call(controller, stub, 597 ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(), 598 (s, c, req, done) -> s.listTableNamesByState(c, req, done), 599 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 600 .call(); 601 } 602 603 @Override 604 public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) { 605 CompletableFuture<TableDescriptor> future = new CompletableFuture<>(); 606 addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName) 607 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 608 List<TableSchema>> call(controller, stub, 609 RequestConverter.buildGetTableDescriptorsRequest(tableName), 610 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 611 (resp) -> resp.getTableSchemaList())) 612 .call(), (tableSchemas, error) -> { 613 if (error != null) { 614 future.completeExceptionally(error); 615 return; 616 } 617 if (!tableSchemas.isEmpty()) { 618 future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0))); 619 } else { 620 future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); 621 } 622 }); 623 return future; 624 } 625 626 @Override 627 public CompletableFuture<Void> createTable(TableDescriptor desc) { 628 return createTable(desc.getTableName(), 629 RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce())); 630 } 631 632 @Override 633 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, 634 int numRegions) { 635 try { 636 return createTable(desc, getSplitKeys(startKey, endKey, numRegions)); 637 } catch (IllegalArgumentException e) { 638 return failedFuture(e); 639 } 640 } 641 642 @Override 643 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) { 644 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 645 + " use createTable(TableDescriptor) instead"); 646 try { 647 verifySplitKeys(splitKeys); 648 return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc, 649 splitKeys, ng.getNonceGroup(), ng.newNonce())); 650 } catch (IllegalArgumentException e) { 651 return failedFuture(e); 652 } 653 } 654 655 private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) { 656 Preconditions.checkNotNull(tableName, "table name is null"); 657 return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request, 658 (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(), 659 new CreateTableProcedureBiConsumer(tableName)); 660 } 661 662 @Override 663 public CompletableFuture<Void> modifyTable(TableDescriptor desc) { 664 return modifyTable(desc, true); 665 } 666 667 public CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenRegions) { 668 // TODO fill the request with reopenRegions 669 return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(), 670 RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), 671 ng.newNonce(), reopenRegions), 672 (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(), 673 new ModifyTableProcedureBiConsumer(this, desc.getTableName())); 674 } 675 676 @Override 677 public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) { 678 return this.<ModifyTableStoreFileTrackerRequest, 679 ModifyTableStoreFileTrackerResponse> procedureCall(tableName, 680 RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT, 681 ng.getNonceGroup(), ng.newNonce()), 682 (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done), 683 (resp) -> resp.getProcId(), 684 new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName)); 685 } 686 687 @Override 688 public CompletableFuture<Void> deleteTable(TableName tableName) { 689 return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName, 690 RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 691 (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(), 692 new DeleteTableProcedureBiConsumer(tableName)); 693 } 694 695 @Override 696 public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) { 697 return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName, 698 RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(), 699 ng.newNonce()), 700 (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(), 701 new TruncateTableProcedureBiConsumer(tableName)); 702 } 703 704 @Override 705 public CompletableFuture<Void> enableTable(TableName tableName) { 706 return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName, 707 RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 708 (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(), 709 new EnableTableProcedureBiConsumer(tableName)); 710 } 711 712 @Override 713 public CompletableFuture<Void> disableTable(TableName tableName) { 714 return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName, 715 RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 716 (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(), 717 new DisableTableProcedureBiConsumer(tableName)); 718 } 719 720 /** 721 * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using 722 * passed parameters. Sets error or boolean result ('true' if table matches the passed-in 723 * targetState). 724 */ 725 private static CompletableFuture<Boolean> completeCheckTableState( 726 CompletableFuture<Boolean> future, TableState tableState, Throwable error, 727 TableState.State targetState, TableName tableName) { 728 if (error != null) { 729 future.completeExceptionally(error); 730 } else { 731 if (tableState != null) { 732 future.complete(tableState.inStates(targetState)); 733 } else { 734 future.completeExceptionally(new TableNotFoundException(tableName)); 735 } 736 } 737 return future; 738 } 739 740 @Override 741 public CompletableFuture<Boolean> isTableEnabled(TableName tableName) { 742 if (TableName.isMetaTableName(tableName)) { 743 return CompletableFuture.completedFuture(true); 744 } 745 CompletableFuture<Boolean> future = new CompletableFuture<>(); 746 addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> { 747 completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error, 748 TableState.State.ENABLED, tableName); 749 }); 750 return future; 751 } 752 753 @Override 754 public CompletableFuture<Boolean> isTableDisabled(TableName tableName) { 755 if (TableName.isMetaTableName(tableName)) { 756 return CompletableFuture.completedFuture(false); 757 } 758 CompletableFuture<Boolean> future = new CompletableFuture<>(); 759 addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> { 760 completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error, 761 TableState.State.DISABLED, tableName); 762 }); 763 return future; 764 } 765 766 @Override 767 public CompletableFuture<Boolean> isTableAvailable(TableName tableName) { 768 return isTableAvailable(tableName, Optional.empty()); 769 } 770 771 @Override 772 public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) { 773 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 774 + " use isTableAvailable(TableName) instead"); 775 return isTableAvailable(tableName, Optional.of(splitKeys)); 776 } 777 778 private CompletableFuture<Boolean> isTableAvailable(TableName tableName, 779 Optional<byte[][]> splitKeys) { 780 if (TableName.isMetaTableName(tableName)) { 781 return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream 782 .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null)); 783 } 784 CompletableFuture<Boolean> future = new CompletableFuture<>(); 785 addListener(isTableEnabled(tableName), (enabled, error) -> { 786 if (error != null) { 787 if (error instanceof TableNotFoundException) { 788 future.complete(false); 789 } else { 790 future.completeExceptionally(error); 791 } 792 return; 793 } 794 if (!enabled) { 795 future.complete(false); 796 } else { 797 addListener(AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName), 798 (locations, error1) -> { 799 if (error1 != null) { 800 future.completeExceptionally(error1); 801 return; 802 } 803 List<HRegionLocation> notDeployedRegions = locations.stream() 804 .filter(loc -> loc.getServerName() == null).collect(Collectors.toList()); 805 if (notDeployedRegions.size() > 0) { 806 if (LOG.isDebugEnabled()) { 807 LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions"); 808 } 809 future.complete(false); 810 return; 811 } 812 813 Optional<Boolean> available = 814 splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys)); 815 future.complete(available.orElse(true)); 816 }); 817 } 818 }); 819 return future; 820 } 821 822 private boolean compareRegionsWithSplitKeys(List<HRegionLocation> locations, byte[][] splitKeys) { 823 int regionCount = 0; 824 for (HRegionLocation location : locations) { 825 RegionInfo info = location.getRegion(); 826 if (Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { 827 regionCount++; 828 continue; 829 } 830 for (byte[] splitKey : splitKeys) { 831 // Just check if the splitkey is available 832 if (Bytes.equals(info.getStartKey(), splitKey)) { 833 regionCount++; 834 break; 835 } 836 } 837 } 838 return regionCount == splitKeys.length + 1; 839 } 840 841 @Override 842 public CompletableFuture<Void> addColumnFamily(TableName tableName, 843 ColumnFamilyDescriptor columnFamily) { 844 return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName, 845 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 846 ng.newNonce()), 847 (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), 848 new AddColumnFamilyProcedureBiConsumer(tableName)); 849 } 850 851 @Override 852 public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) { 853 return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName, 854 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 855 ng.newNonce()), 856 (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(), 857 new DeleteColumnFamilyProcedureBiConsumer(tableName)); 858 } 859 860 @Override 861 public CompletableFuture<Void> modifyColumnFamily(TableName tableName, 862 ColumnFamilyDescriptor columnFamily) { 863 return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName, 864 RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 865 ng.newNonce()), 866 (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(), 867 new ModifyColumnFamilyProcedureBiConsumer(tableName)); 868 } 869 870 @Override 871 public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName, 872 byte[] family, String dstSFT) { 873 return this.<ModifyColumnStoreFileTrackerRequest, 874 ModifyColumnStoreFileTrackerResponse> procedureCall(tableName, 875 RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT, 876 ng.getNonceGroup(), ng.newNonce()), 877 (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done), 878 (resp) -> resp.getProcId(), 879 new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName)); 880 } 881 882 @Override 883 public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) { 884 return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall( 885 RequestConverter.buildCreateNamespaceRequest(descriptor), 886 (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(), 887 new CreateNamespaceProcedureBiConsumer(descriptor.getName())); 888 } 889 890 @Override 891 public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) { 892 return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall( 893 RequestConverter.buildModifyNamespaceRequest(descriptor), 894 (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(), 895 new ModifyNamespaceProcedureBiConsumer(descriptor.getName())); 896 } 897 898 @Override 899 public CompletableFuture<Void> deleteNamespace(String name) { 900 return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall( 901 RequestConverter.buildDeleteNamespaceRequest(name), 902 (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(), 903 new DeleteNamespaceProcedureBiConsumer(name)); 904 } 905 906 @Override 907 public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) { 908 return this.<NamespaceDescriptor> newMasterCaller() 909 .action((controller, stub) -> this.<GetNamespaceDescriptorRequest, 910 GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub, 911 RequestConverter.buildGetNamespaceDescriptorRequest(name), 912 (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), 913 (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))) 914 .call(); 915 } 916 917 @Override 918 public CompletableFuture<List<String>> listNamespaces() { 919 return this.<List<String>> newMasterCaller() 920 .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse, 921 List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(), 922 (s, c, req, done) -> s.listNamespaces(c, req, done), 923 (resp) -> resp.getNamespaceNameList())) 924 .call(); 925 } 926 927 @Override 928 public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() { 929 return this.<List<NamespaceDescriptor>> newMasterCaller() 930 .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest, 931 ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub, 932 ListNamespaceDescriptorsRequest.newBuilder().build(), 933 (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done), 934 (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))) 935 .call(); 936 } 937 938 @Override 939 public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) { 940 return this.<List<RegionInfo>> newAdminCaller() 941 .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse, 942 List<RegionInfo>> adminCall(controller, stub, 943 RequestConverter.buildGetOnlineRegionRequest(), 944 (s, c, req, done) -> s.getOnlineRegion(c, req, done), 945 resp -> ProtobufUtil.getRegionInfos(resp))) 946 .serverName(serverName).call(); 947 } 948 949 @Override 950 public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) { 951 if (tableName.equals(META_TABLE_NAME)) { 952 return connection.registry.getMetaRegionLocations() 953 .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion) 954 .collect(Collectors.toList())); 955 } else { 956 return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply( 957 locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList())); 958 } 959 } 960 961 @Override 962 public CompletableFuture<Void> flush(TableName tableName) { 963 return flush(tableName, Collections.emptyList()); 964 } 965 966 @Override 967 public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) { 968 return flush(tableName, Collections.singletonList(columnFamily)); 969 } 970 971 @Override 972 public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) { 973 // This is for keeping compatibility with old implementation. 974 // If the server version is lower than the client version, it's possible that the 975 // flushTable method is not present in the server side, if so, we need to fall back 976 // to the old implementation. 977 List<byte[]> columnFamilies = columnFamilyList.stream() 978 .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList()); 979 FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies, 980 ng.getNonceGroup(), ng.newNonce()); 981 CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall( 982 tableName, request, (s, c, req, done) -> s.flushTable(c, req, done), 983 (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName)); 984 CompletableFuture<Void> future = new CompletableFuture<>(); 985 addListener(procFuture, (ret, error) -> { 986 if (error != null) { 987 if (error instanceof TableNotFoundException || error instanceof TableNotEnabledException) { 988 future.completeExceptionally(error); 989 } else if (error instanceof DoNotRetryIOException) { 990 // usually this is caused by the method is not present on the server or 991 // the hbase hadoop version does not match the running hadoop version. 992 // if that happens, we need fall back to the old flush implementation. 993 LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error); 994 legacyFlush(future, tableName, columnFamilies); 995 } else { 996 future.completeExceptionally(error); 997 } 998 } else { 999 future.complete(ret); 1000 } 1001 }); 1002 return future; 1003 } 1004 1005 private void legacyFlush(CompletableFuture<Void> future, TableName tableName, 1006 List<byte[]> columnFamilies) { 1007 addListener(tableExists(tableName), (exists, err) -> { 1008 if (err != null) { 1009 future.completeExceptionally(err); 1010 } else if (!exists) { 1011 future.completeExceptionally(new TableNotFoundException(tableName)); 1012 } else { 1013 addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { 1014 if (err2 != null) { 1015 future.completeExceptionally(err2); 1016 } else if (!tableEnabled) { 1017 future.completeExceptionally(new TableNotEnabledException(tableName)); 1018 } else { 1019 Map<String, String> props = new HashMap<>(); 1020 if (columnFamilies != null && !columnFamilies.isEmpty()) { 1021 props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER 1022 .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList()))); 1023 } 1024 addListener( 1025 execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props), 1026 (ret, err3) -> { 1027 if (err3 != null) { 1028 future.completeExceptionally(err3); 1029 } else { 1030 future.complete(ret); 1031 } 1032 }); 1033 } 1034 }); 1035 } 1036 }); 1037 } 1038 1039 @Override 1040 public CompletableFuture<Void> flushRegion(byte[] regionName) { 1041 return flushRegion(regionName, null); 1042 } 1043 1044 @Override 1045 public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) { 1046 CompletableFuture<Void> future = new CompletableFuture<>(); 1047 addListener(getRegionLocation(regionName), (location, err) -> { 1048 if (err != null) { 1049 future.completeExceptionally(err); 1050 return; 1051 } 1052 ServerName serverName = location.getServerName(); 1053 if (serverName == null) { 1054 future 1055 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1056 return; 1057 } 1058 addListener(flush(serverName, location.getRegion(), columnFamily), (ret, err2) -> { 1059 if (err2 != null) { 1060 future.completeExceptionally(err2); 1061 } else { 1062 future.complete(ret); 1063 } 1064 }); 1065 }); 1066 return future; 1067 } 1068 1069 private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo, 1070 byte[] columnFamily) { 1071 return this.<Void> newAdminCaller().serverName(serverName) 1072 .action( 1073 (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, 1074 Void> adminCall(controller, stub, RequestConverter 1075 .buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily, false), 1076 (s, c, req, done) -> s.flushRegion(c, req, done), resp -> null)) 1077 .call(); 1078 } 1079 1080 @Override 1081 public CompletableFuture<Void> flushRegionServer(ServerName sn) { 1082 CompletableFuture<Void> future = new CompletableFuture<>(); 1083 addListener(getRegions(sn), (hRegionInfos, err) -> { 1084 if (err != null) { 1085 future.completeExceptionally(err); 1086 return; 1087 } 1088 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1089 if (hRegionInfos != null) { 1090 hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, null))); 1091 } 1092 addListener(CompletableFuture.allOf( 1093 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1094 if (err2 != null) { 1095 future.completeExceptionally(err2); 1096 } else { 1097 future.complete(ret); 1098 } 1099 }); 1100 }); 1101 return future; 1102 } 1103 1104 @Override 1105 public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) { 1106 return compact(tableName, null, false, compactType); 1107 } 1108 1109 @Override 1110 public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, 1111 CompactType compactType) { 1112 Preconditions.checkNotNull(columnFamily, "columnFamily is null. " 1113 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1114 return compact(tableName, columnFamily, false, compactType); 1115 } 1116 1117 @Override 1118 public CompletableFuture<Void> compactRegion(byte[] regionName) { 1119 return compactRegion(regionName, null, false); 1120 } 1121 1122 @Override 1123 public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) { 1124 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1125 + " If you don't specify a columnFamily, use compactRegion(regionName) instead"); 1126 return compactRegion(regionName, columnFamily, false); 1127 } 1128 1129 @Override 1130 public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) { 1131 return compact(tableName, null, true, compactType); 1132 } 1133 1134 @Override 1135 public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily, 1136 CompactType compactType) { 1137 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1138 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1139 return compact(tableName, columnFamily, true, compactType); 1140 } 1141 1142 @Override 1143 public CompletableFuture<Void> majorCompactRegion(byte[] regionName) { 1144 return compactRegion(regionName, null, true); 1145 } 1146 1147 @Override 1148 public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) { 1149 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1150 + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead"); 1151 return compactRegion(regionName, columnFamily, true); 1152 } 1153 1154 @Override 1155 public CompletableFuture<Void> compactRegionServer(ServerName sn) { 1156 return compactRegionServer(sn, false); 1157 } 1158 1159 @Override 1160 public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) { 1161 return compactRegionServer(sn, true); 1162 } 1163 1164 private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) { 1165 CompletableFuture<Void> future = new CompletableFuture<>(); 1166 addListener(getRegions(sn), (hRegionInfos, err) -> { 1167 if (err != null) { 1168 future.completeExceptionally(err); 1169 return; 1170 } 1171 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1172 if (hRegionInfos != null) { 1173 hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null))); 1174 } 1175 addListener(CompletableFuture.allOf( 1176 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1177 if (err2 != null) { 1178 future.completeExceptionally(err2); 1179 } else { 1180 future.complete(ret); 1181 } 1182 }); 1183 }); 1184 return future; 1185 } 1186 1187 private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily, 1188 boolean major) { 1189 CompletableFuture<Void> future = new CompletableFuture<>(); 1190 addListener(getRegionLocation(regionName), (location, err) -> { 1191 if (err != null) { 1192 future.completeExceptionally(err); 1193 return; 1194 } 1195 ServerName serverName = location.getServerName(); 1196 if (serverName == null) { 1197 future 1198 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1199 return; 1200 } 1201 addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily), 1202 (ret, err2) -> { 1203 if (err2 != null) { 1204 future.completeExceptionally(err2); 1205 } else { 1206 future.complete(ret); 1207 } 1208 }); 1209 }); 1210 return future; 1211 } 1212 1213 /** 1214 * List all region locations for the specific table. 1215 */ 1216 private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) { 1217 if (TableName.META_TABLE_NAME.equals(tableName)) { 1218 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); 1219 addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> { 1220 if (err != null) { 1221 future.completeExceptionally(err); 1222 } else if ( 1223 metaRegions == null || metaRegions.isEmpty() 1224 || metaRegions.getDefaultRegionLocation() == null 1225 ) { 1226 future.completeExceptionally(new IOException("meta region does not found")); 1227 } else { 1228 future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); 1229 } 1230 }); 1231 return future; 1232 } else { 1233 // For non-meta table, we fetch all locations by scanning hbase:meta table 1234 return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName); 1235 } 1236 } 1237 1238 /** 1239 * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() 1240 */ 1241 private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major, 1242 CompactType compactType) { 1243 CompletableFuture<Void> future = new CompletableFuture<>(); 1244 1245 switch (compactType) { 1246 case MOB: 1247 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 1248 if (err != null) { 1249 future.completeExceptionally(err); 1250 return; 1251 } 1252 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 1253 addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> { 1254 if (err2 != null) { 1255 future.completeExceptionally(err2); 1256 } else { 1257 future.complete(ret); 1258 } 1259 }); 1260 }); 1261 break; 1262 case NORMAL: 1263 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 1264 if (err != null) { 1265 future.completeExceptionally(err); 1266 return; 1267 } 1268 if (locations == null || locations.isEmpty()) { 1269 future.completeExceptionally(new TableNotFoundException(tableName)); 1270 } 1271 CompletableFuture<?>[] compactFutures = 1272 locations.stream().filter(l -> l.getRegion() != null) 1273 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 1274 .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) 1275 .toArray(CompletableFuture<?>[]::new); 1276 // future complete unless all of the compact futures are completed. 1277 addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> { 1278 if (err2 != null) { 1279 future.completeExceptionally(err2); 1280 } else { 1281 future.complete(ret); 1282 } 1283 }); 1284 }); 1285 break; 1286 default: 1287 throw new IllegalArgumentException("Unknown compactType: " + compactType); 1288 } 1289 return future; 1290 } 1291 1292 /** 1293 * Compact the region at specific region server. 1294 */ 1295 private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri, 1296 final boolean major, byte[] columnFamily) { 1297 return this.<Void> newAdminCaller().serverName(sn) 1298 .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, 1299 Void> adminCall(controller, stub, 1300 RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily), 1301 (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null)) 1302 .call(); 1303 } 1304 1305 private byte[] toEncodeRegionName(byte[] regionName) { 1306 return RegionInfo.isEncodedRegionName(regionName) 1307 ? regionName 1308 : Bytes.toBytes(RegionInfo.encodeRegionName(regionName)); 1309 } 1310 1311 private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName, 1312 CompletableFuture<TableName> result) { 1313 addListener(getRegionLocation(encodeRegionName), (location, err) -> { 1314 if (err != null) { 1315 result.completeExceptionally(err); 1316 return; 1317 } 1318 RegionInfo regionInfo = location.getRegion(); 1319 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1320 result.completeExceptionally( 1321 new IllegalArgumentException("Can't invoke merge on non-default regions directly")); 1322 return; 1323 } 1324 if (!tableName.compareAndSet(null, regionInfo.getTable())) { 1325 if (!tableName.get().equals(regionInfo.getTable())) { 1326 // tables of this two region should be same. 1327 result.completeExceptionally( 1328 new IllegalArgumentException("Cannot merge regions from two different tables " 1329 + tableName.get() + " and " + regionInfo.getTable())); 1330 } else { 1331 result.complete(tableName.get()); 1332 } 1333 } 1334 }); 1335 } 1336 1337 private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) { 1338 AtomicReference<TableName> tableNameRef = new AtomicReference<>(); 1339 CompletableFuture<TableName> future = new CompletableFuture<>(); 1340 for (byte[] encodedRegionName : encodedRegionNames) { 1341 checkAndGetTableName(encodedRegionName, tableNameRef, future); 1342 } 1343 return future; 1344 } 1345 1346 @Override 1347 public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) { 1348 return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE); 1349 } 1350 1351 @Override 1352 public CompletableFuture<Boolean> isMergeEnabled() { 1353 return isSplitOrMergeOn(MasterSwitchType.MERGE); 1354 } 1355 1356 @Override 1357 public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) { 1358 return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT); 1359 } 1360 1361 @Override 1362 public CompletableFuture<Boolean> isSplitEnabled() { 1363 return isSplitOrMergeOn(MasterSwitchType.SPLIT); 1364 } 1365 1366 private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous, 1367 MasterSwitchType switchType) { 1368 SetSplitOrMergeEnabledRequest request = 1369 RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType); 1370 return this.<Boolean> newMasterCaller() 1371 .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest, 1372 SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1373 (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done), 1374 (resp) -> resp.getPrevValueList().get(0))) 1375 .call(); 1376 } 1377 1378 private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) { 1379 IsSplitOrMergeEnabledRequest request = 1380 RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType); 1381 return this.<Boolean> newMasterCaller() 1382 .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest, 1383 IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1384 (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled())) 1385 .call(); 1386 } 1387 1388 @Override 1389 public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) { 1390 if (nameOfRegionsToMerge.size() < 2) { 1391 return failedFuture(new IllegalArgumentException( 1392 "Can not merge only " + nameOfRegionsToMerge.size() + " region")); 1393 } 1394 CompletableFuture<Void> future = new CompletableFuture<>(); 1395 byte[][] encodedNameOfRegionsToMerge = 1396 nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new); 1397 1398 addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> { 1399 if (err != null) { 1400 future.completeExceptionally(err); 1401 return; 1402 } 1403 1404 final MergeTableRegionsRequest request; 1405 try { 1406 request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge, 1407 forcible, ng.getNonceGroup(), ng.newNonce()); 1408 } catch (DeserializationException e) { 1409 future.completeExceptionally(e); 1410 return; 1411 } 1412 1413 addListener( 1414 this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions, 1415 MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)), 1416 (ret, err2) -> { 1417 if (err2 != null) { 1418 future.completeExceptionally(err2); 1419 } else { 1420 future.complete(ret); 1421 } 1422 }); 1423 }); 1424 return future; 1425 } 1426 1427 @Override 1428 public CompletableFuture<Void> split(TableName tableName) { 1429 CompletableFuture<Void> future = new CompletableFuture<>(); 1430 addListener(tableExists(tableName), (exist, error) -> { 1431 if (error != null) { 1432 future.completeExceptionally(error); 1433 return; 1434 } 1435 if (!exist) { 1436 future.completeExceptionally(new TableNotFoundException(tableName)); 1437 return; 1438 } 1439 addListener( 1440 metaTable 1441 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY) 1442 .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION)) 1443 .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))), 1444 (results, err2) -> { 1445 if (err2 != null) { 1446 future.completeExceptionally(err2); 1447 return; 1448 } 1449 if (results != null && !results.isEmpty()) { 1450 List<CompletableFuture<Void>> splitFutures = new ArrayList<>(); 1451 for (Result r : results) { 1452 if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) { 1453 continue; 1454 } 1455 RegionLocations rl = MetaTableAccessor.getRegionLocations(r); 1456 if (rl != null) { 1457 for (HRegionLocation h : rl.getRegionLocations()) { 1458 if (h != null && h.getServerName() != null) { 1459 RegionInfo hri = h.getRegion(); 1460 if ( 1461 hri == null || hri.isSplitParent() 1462 || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID 1463 ) { 1464 continue; 1465 } 1466 splitFutures.add(split(hri, null)); 1467 } 1468 } 1469 } 1470 } 1471 addListener( 1472 CompletableFuture 1473 .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])), 1474 (ret, exception) -> { 1475 if (exception != null) { 1476 future.completeExceptionally(exception); 1477 return; 1478 } 1479 future.complete(ret); 1480 }); 1481 } else { 1482 future.complete(null); 1483 } 1484 }); 1485 }); 1486 return future; 1487 } 1488 1489 @Override 1490 public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) { 1491 CompletableFuture<Void> result = new CompletableFuture<>(); 1492 if (splitPoint == null) { 1493 return failedFuture(new IllegalArgumentException("splitPoint can not be null.")); 1494 } 1495 addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true), 1496 (loc, err) -> { 1497 if (err != null) { 1498 result.completeExceptionally(err); 1499 } else if (loc == null || loc.getRegion() == null) { 1500 result.completeExceptionally(new IllegalArgumentException( 1501 "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); 1502 } else { 1503 addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> { 1504 if (err2 != null) { 1505 result.completeExceptionally(err2); 1506 } else { 1507 result.complete(ret); 1508 } 1509 1510 }); 1511 } 1512 }); 1513 return result; 1514 } 1515 1516 @Override 1517 public CompletableFuture<Void> splitRegion(byte[] regionName) { 1518 CompletableFuture<Void> future = new CompletableFuture<>(); 1519 addListener(getRegionLocation(regionName), (location, err) -> { 1520 if (err != null) { 1521 future.completeExceptionally(err); 1522 return; 1523 } 1524 RegionInfo regionInfo = location.getRegion(); 1525 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1526 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1527 + "Replicas are auto-split when their primary is split.")); 1528 return; 1529 } 1530 ServerName serverName = location.getServerName(); 1531 if (serverName == null) { 1532 future 1533 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1534 return; 1535 } 1536 addListener(split(regionInfo, null), (ret, err2) -> { 1537 if (err2 != null) { 1538 future.completeExceptionally(err2); 1539 } else { 1540 future.complete(ret); 1541 } 1542 }); 1543 }); 1544 return future; 1545 } 1546 1547 @Override 1548 public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) { 1549 Preconditions.checkNotNull(splitPoint, 1550 "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead"); 1551 CompletableFuture<Void> future = new CompletableFuture<>(); 1552 addListener(getRegionLocation(regionName), (location, err) -> { 1553 if (err != null) { 1554 future.completeExceptionally(err); 1555 return; 1556 } 1557 RegionInfo regionInfo = location.getRegion(); 1558 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1559 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1560 + "Replicas are auto-split when their primary is split.")); 1561 return; 1562 } 1563 ServerName serverName = location.getServerName(); 1564 if (serverName == null) { 1565 future 1566 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1567 return; 1568 } 1569 if ( 1570 regionInfo.getStartKey() != null 1571 && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0 1572 ) { 1573 future.completeExceptionally( 1574 new IllegalArgumentException("should not give a splitkey which equals to startkey!")); 1575 return; 1576 } 1577 addListener(split(regionInfo, splitPoint), (ret, err2) -> { 1578 if (err2 != null) { 1579 future.completeExceptionally(err2); 1580 } else { 1581 future.complete(ret); 1582 } 1583 }); 1584 }); 1585 return future; 1586 } 1587 1588 private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) { 1589 CompletableFuture<Void> future = new CompletableFuture<>(); 1590 TableName tableName = hri.getTable(); 1591 final SplitTableRegionRequest request; 1592 try { 1593 request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(), 1594 ng.newNonce()); 1595 } catch (DeserializationException e) { 1596 future.completeExceptionally(e); 1597 return future; 1598 } 1599 1600 addListener( 1601 this.procedureCall(tableName, request, MasterService.Interface::splitRegion, 1602 SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)), 1603 (ret, err2) -> { 1604 if (err2 != null) { 1605 future.completeExceptionally(err2); 1606 } else { 1607 future.complete(ret); 1608 } 1609 }); 1610 return future; 1611 } 1612 1613 @Override 1614 public CompletableFuture<Void> truncateRegion(byte[] regionName) { 1615 CompletableFuture<Void> future = new CompletableFuture<>(); 1616 addListener(getRegionLocation(regionName), (location, err) -> { 1617 if (err != null) { 1618 future.completeExceptionally(err); 1619 return; 1620 } 1621 RegionInfo regionInfo = location.getRegion(); 1622 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1623 future.completeExceptionally(new IllegalArgumentException( 1624 "Can't truncate replicas directly.Replicas are auto-truncated " 1625 + "when their primary is truncated.")); 1626 return; 1627 } 1628 ServerName serverName = location.getServerName(); 1629 if (serverName == null) { 1630 future 1631 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1632 return; 1633 } 1634 addListener(truncateRegion(regionInfo), (ret, err2) -> { 1635 if (err2 != null) { 1636 future.completeExceptionally(err2); 1637 } else { 1638 future.complete(ret); 1639 } 1640 }); 1641 }); 1642 return future; 1643 } 1644 1645 private CompletableFuture<Void> truncateRegion(final RegionInfo hri) { 1646 CompletableFuture<Void> future = new CompletableFuture<>(); 1647 TableName tableName = hri.getTable(); 1648 final MasterProtos.TruncateRegionRequest request; 1649 try { 1650 request = RequestConverter.buildTruncateRegionRequest(hri, ng.getNonceGroup(), ng.newNonce()); 1651 } catch (DeserializationException e) { 1652 future.completeExceptionally(e); 1653 return future; 1654 } 1655 addListener(this.procedureCall(tableName, request, MasterService.Interface::truncateRegion, 1656 MasterProtos.TruncateRegionResponse::getProcId, 1657 new TruncateRegionProcedureBiConsumer(tableName)), (ret, err2) -> { 1658 if (err2 != null) { 1659 future.completeExceptionally(err2); 1660 } else { 1661 future.complete(ret); 1662 } 1663 }); 1664 return future; 1665 } 1666 1667 @Override 1668 public CompletableFuture<Void> assign(byte[] regionName) { 1669 CompletableFuture<Void> future = new CompletableFuture<>(); 1670 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1671 if (err != null) { 1672 future.completeExceptionally(err); 1673 return; 1674 } 1675 addListener( 1676 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1677 .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( 1678 controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), 1679 (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)) 1680 .call(), 1681 (ret, err2) -> { 1682 if (err2 != null) { 1683 future.completeExceptionally(err2); 1684 } else { 1685 future.complete(ret); 1686 } 1687 }); 1688 }); 1689 return future; 1690 } 1691 1692 @Override 1693 public CompletableFuture<Void> unassign(byte[] regionName) { 1694 CompletableFuture<Void> future = new CompletableFuture<>(); 1695 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1696 if (err != null) { 1697 future.completeExceptionally(err); 1698 return; 1699 } 1700 addListener( 1701 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1702 .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse, 1703 Void> call(controller, stub, 1704 RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()), 1705 (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)) 1706 .call(), 1707 (ret, err2) -> { 1708 if (err2 != null) { 1709 future.completeExceptionally(err2); 1710 } else { 1711 future.complete(ret); 1712 } 1713 }); 1714 }); 1715 return future; 1716 } 1717 1718 @Override 1719 public CompletableFuture<Void> offline(byte[] regionName) { 1720 CompletableFuture<Void> future = new CompletableFuture<>(); 1721 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1722 if (err != null) { 1723 future.completeExceptionally(err); 1724 return; 1725 } 1726 addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1727 .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call( 1728 controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), 1729 (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)) 1730 .call(), (ret, err2) -> { 1731 if (err2 != null) { 1732 future.completeExceptionally(err2); 1733 } else { 1734 future.complete(ret); 1735 } 1736 }); 1737 }); 1738 return future; 1739 } 1740 1741 @Override 1742 public CompletableFuture<Void> move(byte[] regionName) { 1743 CompletableFuture<Void> future = new CompletableFuture<>(); 1744 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1745 if (err != null) { 1746 future.completeExceptionally(err); 1747 return; 1748 } 1749 addListener( 1750 moveRegion(regionInfo, 1751 RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)), 1752 (ret, err2) -> { 1753 if (err2 != null) { 1754 future.completeExceptionally(err2); 1755 } else { 1756 future.complete(ret); 1757 } 1758 }); 1759 }); 1760 return future; 1761 } 1762 1763 @Override 1764 public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) { 1765 Preconditions.checkNotNull(destServerName, 1766 "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead"); 1767 CompletableFuture<Void> future = new CompletableFuture<>(); 1768 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1769 if (err != null) { 1770 future.completeExceptionally(err); 1771 return; 1772 } 1773 addListener( 1774 moveRegion(regionInfo, RequestConverter 1775 .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), 1776 (ret, err2) -> { 1777 if (err2 != null) { 1778 future.completeExceptionally(err2); 1779 } else { 1780 future.complete(ret); 1781 } 1782 }); 1783 }); 1784 return future; 1785 } 1786 1787 private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) { 1788 return this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1789 .action( 1790 (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller, 1791 stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)) 1792 .call(); 1793 } 1794 1795 @Override 1796 public CompletableFuture<Void> setQuota(QuotaSettings quota) { 1797 return this.<Void> newMasterCaller() 1798 .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller, 1799 stub, QuotaSettings.buildSetQuotaRequestProto(quota), 1800 (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)) 1801 .call(); 1802 } 1803 1804 @Override 1805 public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) { 1806 CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>(); 1807 Scan scan = QuotaTableUtil.makeScan(filter); 1808 this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan, 1809 new AdvancedScanResultConsumer() { 1810 List<QuotaSettings> settings = new ArrayList<>(); 1811 1812 @Override 1813 public void onNext(Result[] results, ScanController controller) { 1814 for (Result result : results) { 1815 try { 1816 QuotaTableUtil.parseResultToCollection(result, settings); 1817 } catch (IOException e) { 1818 controller.terminate(); 1819 future.completeExceptionally(e); 1820 } 1821 } 1822 } 1823 1824 @Override 1825 public void onError(Throwable error) { 1826 future.completeExceptionally(error); 1827 } 1828 1829 @Override 1830 public void onComplete() { 1831 future.complete(settings); 1832 } 1833 }); 1834 return future; 1835 } 1836 1837 @Override 1838 public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, 1839 boolean enabled) { 1840 return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall( 1841 RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), 1842 (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1843 new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER")); 1844 } 1845 1846 @Override 1847 public CompletableFuture<Void> removeReplicationPeer(String peerId) { 1848 return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall( 1849 RequestConverter.buildRemoveReplicationPeerRequest(peerId), 1850 (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1851 new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER")); 1852 } 1853 1854 @Override 1855 public CompletableFuture<Void> enableReplicationPeer(String peerId) { 1856 return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall( 1857 RequestConverter.buildEnableReplicationPeerRequest(peerId), 1858 (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1859 new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER")); 1860 } 1861 1862 @Override 1863 public CompletableFuture<Void> disableReplicationPeer(String peerId) { 1864 return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall( 1865 RequestConverter.buildDisableReplicationPeerRequest(peerId), 1866 (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1867 new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER")); 1868 } 1869 1870 @Override 1871 public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) { 1872 return this.<ReplicationPeerConfig> newMasterCaller() 1873 .action((controller, stub) -> this.<GetReplicationPeerConfigRequest, 1874 GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub, 1875 RequestConverter.buildGetReplicationPeerConfigRequest(peerId), 1876 (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), 1877 (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))) 1878 .call(); 1879 } 1880 1881 @Override 1882 public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, 1883 ReplicationPeerConfig peerConfig) { 1884 return this.<UpdateReplicationPeerConfigRequest, 1885 UpdateReplicationPeerConfigResponse> procedureCall( 1886 RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig), 1887 (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), 1888 (resp) -> resp.getProcId(), 1889 new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG")); 1890 } 1891 1892 @Override 1893 public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, 1894 Map<TableName, List<String>> tableCfs) { 1895 if (tableCfs == null) { 1896 return failedFuture(new ReplicationException("tableCfs is null")); 1897 } 1898 1899 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1900 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1901 if (!completeExceptionally(future, error)) { 1902 ReplicationPeerConfig newPeerConfig = 1903 ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); 1904 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1905 if (!completeExceptionally(future, error)) { 1906 future.complete(result); 1907 } 1908 }); 1909 } 1910 }); 1911 return future; 1912 } 1913 1914 @Override 1915 public CompletableFuture<Void> removeReplicationPeerTableCFs(String id, 1916 Map<TableName, List<String>> tableCfs) { 1917 if (tableCfs == null) { 1918 return failedFuture(new ReplicationException("tableCfs is null")); 1919 } 1920 1921 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1922 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1923 if (!completeExceptionally(future, error)) { 1924 ReplicationPeerConfig newPeerConfig = null; 1925 try { 1926 newPeerConfig = ReplicationPeerConfigUtil 1927 .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); 1928 } catch (ReplicationException e) { 1929 future.completeExceptionally(e); 1930 return; 1931 } 1932 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1933 if (!completeExceptionally(future, error)) { 1934 future.complete(result); 1935 } 1936 }); 1937 } 1938 }); 1939 return future; 1940 } 1941 1942 @Override 1943 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() { 1944 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null)); 1945 } 1946 1947 @Override 1948 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) { 1949 Preconditions.checkNotNull(pattern, 1950 "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead"); 1951 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern)); 1952 } 1953 1954 private CompletableFuture<List<ReplicationPeerDescription>> 1955 listReplicationPeers(ListReplicationPeersRequest request) { 1956 return this.<List<ReplicationPeerDescription>> newMasterCaller() 1957 .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse, 1958 List<ReplicationPeerDescription>> call(controller, stub, request, 1959 (s, c, req, done) -> s.listReplicationPeers(c, req, done), 1960 (resp) -> resp.getPeerDescList().stream() 1961 .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) 1962 .collect(Collectors.toList()))) 1963 .call(); 1964 } 1965 1966 @Override 1967 public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() { 1968 CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>(); 1969 addListener(listTableDescriptors(), (tables, error) -> { 1970 if (!completeExceptionally(future, error)) { 1971 List<TableCFs> replicatedTableCFs = new ArrayList<>(); 1972 tables.forEach(table -> { 1973 Map<String, Integer> cfs = new HashMap<>(); 1974 Stream.of(table.getColumnFamilies()) 1975 .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) 1976 .forEach(column -> { 1977 cfs.put(column.getNameAsString(), column.getScope()); 1978 }); 1979 if (!cfs.isEmpty()) { 1980 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); 1981 } 1982 }); 1983 future.complete(replicatedTableCFs); 1984 } 1985 }); 1986 return future; 1987 } 1988 1989 @Override 1990 public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) { 1991 SnapshotProtos.SnapshotDescription snapshot = 1992 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); 1993 try { 1994 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 1995 } catch (IllegalArgumentException e) { 1996 return failedFuture(e); 1997 } 1998 CompletableFuture<Void> future = new CompletableFuture<>(); 1999 final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot) 2000 .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(); 2001 addListener(this.<SnapshotResponse> newMasterCaller() 2002 .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call( 2003 controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp)) 2004 .call(), (resp, err) -> { 2005 if (err != null) { 2006 future.completeExceptionally(err); 2007 return; 2008 } 2009 waitSnapshotFinish(snapshotDesc, future, resp); 2010 }); 2011 return future; 2012 } 2013 2014 // This is for keeping compatibility with old implementation. 2015 // If there is a procId field in the response, then the snapshot will be operated with a 2016 // SnapshotProcedure, otherwise the snapshot will be coordinated by zk. 2017 private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future, 2018 SnapshotResponse resp) { 2019 if (resp.hasProcId()) { 2020 getProcedureResult(resp.getProcId(), future, 0); 2021 addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName())); 2022 } else { 2023 long expectedTimeout = resp.getExpectedTimeout(); 2024 TimerTask pollingTask = new TimerTask() { 2025 int tries = 0; 2026 long startTime = EnvironmentEdgeManager.currentTime(); 2027 long endTime = startTime + expectedTimeout; 2028 long maxPauseTime = expectedTimeout / maxAttempts; 2029 2030 @Override 2031 public void run(Timeout timeout) throws Exception { 2032 if (EnvironmentEdgeManager.currentTime() < endTime) { 2033 addListener(isSnapshotFinished(snapshot), (done, err2) -> { 2034 if (err2 != null) { 2035 future.completeExceptionally(err2); 2036 } else if (done) { 2037 future.complete(null); 2038 } else { 2039 // retry again after pauseTime. 2040 long pauseTime = 2041 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2042 pauseTime = Math.min(pauseTime, maxPauseTime); 2043 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS); 2044 } 2045 }); 2046 } else { 2047 future 2048 .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName() 2049 + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot)); 2050 } 2051 } 2052 }; 2053 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2054 } 2055 } 2056 2057 @Override 2058 public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) { 2059 return this.<Boolean> newMasterCaller() 2060 .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, 2061 Boolean> call(controller, stub, 2062 IsSnapshotDoneRequest.newBuilder() 2063 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 2064 (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())) 2065 .call(); 2066 } 2067 2068 @Override 2069 public CompletableFuture<Void> restoreSnapshot(String snapshotName) { 2070 boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean( 2071 HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT, 2072 HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT); 2073 return restoreSnapshot(snapshotName, takeFailSafeSnapshot); 2074 } 2075 2076 @Override 2077 public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot, 2078 boolean restoreAcl) { 2079 CompletableFuture<Void> future = new CompletableFuture<>(); 2080 addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> { 2081 if (err != null) { 2082 future.completeExceptionally(err); 2083 return; 2084 } 2085 TableName tableName = null; 2086 if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { 2087 for (SnapshotDescription snap : snapshotDescriptions) { 2088 if (snap.getName().equals(snapshotName)) { 2089 tableName = snap.getTableName(); 2090 break; 2091 } 2092 } 2093 } 2094 if (tableName == null) { 2095 future.completeExceptionally(new RestoreSnapshotException( 2096 "Unable to find the table name for snapshot=" + snapshotName)); 2097 return; 2098 } 2099 final TableName finalTableName = tableName; 2100 addListener(tableExists(finalTableName), (exists, err2) -> { 2101 if (err2 != null) { 2102 future.completeExceptionally(err2); 2103 } else if (!exists) { 2104 // if table does not exist, then just clone snapshot into new table. 2105 completeConditionalOnFuture(future, 2106 internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null)); 2107 } else { 2108 addListener(isTableDisabled(finalTableName), (disabled, err4) -> { 2109 if (err4 != null) { 2110 future.completeExceptionally(err4); 2111 } else if (!disabled) { 2112 future.completeExceptionally(new TableNotDisabledException(finalTableName)); 2113 } else { 2114 completeConditionalOnFuture(future, 2115 restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl)); 2116 } 2117 }); 2118 } 2119 }); 2120 }); 2121 return future; 2122 } 2123 2124 private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName, 2125 boolean takeFailSafeSnapshot, boolean restoreAcl) { 2126 if (takeFailSafeSnapshot) { 2127 CompletableFuture<Void> future = new CompletableFuture<>(); 2128 // Step.1 Take a snapshot of the current state 2129 String failSafeSnapshotSnapshotNameFormat = 2130 this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, 2131 HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); 2132 final String failSafeSnapshotSnapshotName = 2133 failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName) 2134 .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) 2135 .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); 2136 LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2137 addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> { 2138 if (err != null) { 2139 future.completeExceptionally(err); 2140 } else { 2141 // Step.2 Restore snapshot 2142 addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null), 2143 (void2, err2) -> { 2144 if (err2 != null) { 2145 // Step.3.a Something went wrong during the restore and try to rollback. 2146 addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, 2147 restoreAcl, null), (void3, err3) -> { 2148 if (err3 != null) { 2149 future.completeExceptionally(err3); 2150 } else { 2151 String msg = 2152 "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" 2153 + failSafeSnapshotSnapshotName + " succeeded."; 2154 future.completeExceptionally(new RestoreSnapshotException(msg, err2)); 2155 } 2156 }); 2157 } else { 2158 // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. 2159 LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2160 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> { 2161 if (err3 != null) { 2162 LOG.error( 2163 "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, 2164 err3); 2165 future.completeExceptionally(err3); 2166 } else { 2167 future.complete(ret3); 2168 } 2169 }); 2170 } 2171 }); 2172 } 2173 }); 2174 return future; 2175 } else { 2176 return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null); 2177 } 2178 } 2179 2180 private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture, 2181 CompletableFuture<T> parentFuture) { 2182 addListener(parentFuture, (res, err) -> { 2183 if (err != null) { 2184 dependentFuture.completeExceptionally(err); 2185 } else { 2186 dependentFuture.complete(res); 2187 } 2188 }); 2189 } 2190 2191 @Override 2192 public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName, 2193 boolean restoreAcl, String customSFT) { 2194 CompletableFuture<Void> future = new CompletableFuture<>(); 2195 addListener(tableExists(tableName), (exists, err) -> { 2196 if (err != null) { 2197 future.completeExceptionally(err); 2198 } else if (exists) { 2199 future.completeExceptionally(new TableExistsException(tableName)); 2200 } else { 2201 completeConditionalOnFuture(future, 2202 internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT)); 2203 } 2204 }); 2205 return future; 2206 } 2207 2208 private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName, 2209 boolean restoreAcl, String customSFT) { 2210 SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder() 2211 .setName(snapshotName).setTable(tableName.getNameAsString()).build(); 2212 try { 2213 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2214 } catch (IllegalArgumentException e) { 2215 return failedFuture(e); 2216 } 2217 RestoreSnapshotRequest.Builder builder = 2218 RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup()) 2219 .setNonce(ng.newNonce()).setRestoreACL(restoreAcl); 2220 if (customSFT != null) { 2221 builder.setCustomSFT(customSFT); 2222 } 2223 return waitProcedureResult(this.<Long> newMasterCaller() 2224 .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, 2225 Long> call(controller, stub, builder.build(), 2226 (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())) 2227 .call()); 2228 } 2229 2230 @Override 2231 public CompletableFuture<List<SnapshotDescription>> listSnapshots() { 2232 return getCompletedSnapshots(null); 2233 } 2234 2235 @Override 2236 public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) { 2237 Preconditions.checkNotNull(pattern, 2238 "pattern is null. If you don't specify a pattern, use listSnapshots() instead"); 2239 return getCompletedSnapshots(pattern); 2240 } 2241 2242 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) { 2243 return this.<List<SnapshotDescription>> newMasterCaller() 2244 .action((controller, stub) -> this.<GetCompletedSnapshotsRequest, 2245 GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub, 2246 GetCompletedSnapshotsRequest.newBuilder().build(), 2247 (s, c, req, done) -> s.getCompletedSnapshots(c, req, done), 2248 resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern))) 2249 .call(); 2250 } 2251 2252 @Override 2253 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) { 2254 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2255 + " If you don't specify a tableNamePattern, use listSnapshots() instead"); 2256 return getCompletedSnapshots(tableNamePattern, null); 2257 } 2258 2259 @Override 2260 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern, 2261 Pattern snapshotNamePattern) { 2262 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2263 + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead"); 2264 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2265 + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead"); 2266 return getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2267 } 2268 2269 private CompletableFuture<List<SnapshotDescription>> 2270 getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) { 2271 CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); 2272 addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> { 2273 if (err != null) { 2274 future.completeExceptionally(err); 2275 return; 2276 } 2277 if (tableNames == null || tableNames.size() <= 0) { 2278 future.complete(Collections.emptyList()); 2279 return; 2280 } 2281 addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> { 2282 if (err2 != null) { 2283 future.completeExceptionally(err2); 2284 return; 2285 } 2286 if (snapshotDescList == null || snapshotDescList.isEmpty()) { 2287 future.complete(Collections.emptyList()); 2288 return; 2289 } 2290 future.complete(snapshotDescList.stream() 2291 .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) 2292 .collect(Collectors.toList())); 2293 }); 2294 }); 2295 return future; 2296 } 2297 2298 @Override 2299 public CompletableFuture<Void> deleteSnapshot(String snapshotName) { 2300 return internalDeleteSnapshot(new SnapshotDescription(snapshotName)); 2301 } 2302 2303 @Override 2304 public CompletableFuture<Void> deleteSnapshots() { 2305 return internalDeleteSnapshots(null, null); 2306 } 2307 2308 @Override 2309 public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) { 2310 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2311 + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead"); 2312 return internalDeleteSnapshots(null, snapshotNamePattern); 2313 } 2314 2315 @Override 2316 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) { 2317 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2318 + " If you don't specify a tableNamePattern, use deleteSnapshots() instead"); 2319 return internalDeleteSnapshots(tableNamePattern, null); 2320 } 2321 2322 @Override 2323 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern, 2324 Pattern snapshotNamePattern) { 2325 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2326 + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead"); 2327 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2328 + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead"); 2329 return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern); 2330 } 2331 2332 private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern, 2333 Pattern snapshotNamePattern) { 2334 CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture; 2335 if (tableNamePattern == null) { 2336 listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern); 2337 } else { 2338 listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2339 } 2340 CompletableFuture<Void> future = new CompletableFuture<>(); 2341 addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> { 2342 if (err != null) { 2343 future.completeExceptionally(err); 2344 return; 2345 } 2346 if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) { 2347 future.complete(null); 2348 return; 2349 } 2350 addListener(CompletableFuture.allOf(snapshotDescriptions.stream() 2351 .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> { 2352 if (e != null) { 2353 future.completeExceptionally(e); 2354 } else { 2355 future.complete(v); 2356 } 2357 }); 2358 }); 2359 return future; 2360 } 2361 2362 private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) { 2363 return this.<Void> newMasterCaller() 2364 .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call( 2365 controller, stub, 2366 DeleteSnapshotRequest.newBuilder() 2367 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 2368 (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null)) 2369 .call(); 2370 } 2371 2372 @Override 2373 public CompletableFuture<Void> execProcedure(String signature, String instance, 2374 Map<String, String> props) { 2375 CompletableFuture<Void> future = new CompletableFuture<>(); 2376 ProcedureDescription procDesc = 2377 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2378 addListener(this.<Long> newMasterCaller() 2379 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call( 2380 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(), 2381 (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout())) 2382 .call(), (expectedTimeout, err) -> { 2383 if (err != null) { 2384 future.completeExceptionally(err); 2385 return; 2386 } 2387 TimerTask pollingTask = new TimerTask() { 2388 int tries = 0; 2389 long startTime = EnvironmentEdgeManager.currentTime(); 2390 long endTime = startTime + expectedTimeout; 2391 long maxPauseTime = expectedTimeout / maxAttempts; 2392 2393 @Override 2394 public void run(Timeout timeout) throws Exception { 2395 if (EnvironmentEdgeManager.currentTime() < endTime) { 2396 addListener(isProcedureFinished(signature, instance, props), (done, err2) -> { 2397 if (err2 != null) { 2398 future.completeExceptionally(err2); 2399 return; 2400 } 2401 if (done) { 2402 future.complete(null); 2403 } else { 2404 // retry again after pauseTime. 2405 long pauseTime = 2406 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2407 pauseTime = Math.min(pauseTime, maxPauseTime); 2408 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 2409 TimeUnit.MICROSECONDS); 2410 } 2411 }); 2412 } else { 2413 future.completeExceptionally(new IOException("Procedure '" + signature + " : " 2414 + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms")); 2415 } 2416 } 2417 }; 2418 // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. 2419 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2420 }); 2421 return future; 2422 } 2423 2424 @Override 2425 public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance, 2426 Map<String, String> props) { 2427 ProcedureDescription proDesc = 2428 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2429 return this.<byte[]> newMasterCaller() 2430 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call( 2431 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(), 2432 (s, c, req, done) -> s.execProcedureWithRet(c, req, done), 2433 resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null)) 2434 .call(); 2435 } 2436 2437 @Override 2438 public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance, 2439 Map<String, String> props) { 2440 ProcedureDescription proDesc = 2441 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2442 return this.<Boolean> newMasterCaller() 2443 .action( 2444 (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call( 2445 controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(), 2446 (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone())) 2447 .call(); 2448 } 2449 2450 @Override 2451 public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) { 2452 return this.<Boolean> newMasterCaller().action( 2453 (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call( 2454 controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(), 2455 (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted())) 2456 .call(); 2457 } 2458 2459 @Override 2460 public CompletableFuture<String> getProcedures() { 2461 return this.<String> newMasterCaller() 2462 .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call( 2463 controller, stub, GetProceduresRequest.newBuilder().build(), 2464 (s, c, req, done) -> s.getProcedures(c, req, done), 2465 resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))) 2466 .call(); 2467 } 2468 2469 @Override 2470 public CompletableFuture<String> getLocks() { 2471 return this.<String> newMasterCaller() 2472 .action( 2473 (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller, 2474 stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done), 2475 resp -> ProtobufUtil.toLockJson(resp.getLockList()))) 2476 .call(); 2477 } 2478 2479 @Override 2480 public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, 2481 boolean offload) { 2482 return this.<Void> newMasterCaller() 2483 .action((controller, stub) -> this.<DecommissionRegionServersRequest, 2484 DecommissionRegionServersResponse, Void> call(controller, stub, 2485 RequestConverter.buildDecommissionRegionServersRequest(servers, offload), 2486 (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null)) 2487 .call(); 2488 } 2489 2490 @Override 2491 public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() { 2492 return this.<List<ServerName>> newMasterCaller() 2493 .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest, 2494 ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub, 2495 ListDecommissionedRegionServersRequest.newBuilder().build(), 2496 (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done), 2497 resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName) 2498 .collect(Collectors.toList()))) 2499 .call(); 2500 } 2501 2502 @Override 2503 public CompletableFuture<Void> recommissionRegionServer(ServerName server, 2504 List<byte[]> encodedRegionNames) { 2505 return this.<Void> newMasterCaller() 2506 .action((controller, stub) -> this.<RecommissionRegionServerRequest, 2507 RecommissionRegionServerResponse, Void> call(controller, stub, 2508 RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames), 2509 (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null)) 2510 .call(); 2511 } 2512 2513 /** 2514 * Get the region location for the passed region name. The region name may be a full region name 2515 * or encoded region name. If the region does not found, then it'll throw an 2516 * UnknownRegionException wrapped by a {@link CompletableFuture} 2517 * @param regionNameOrEncodedRegionName region name or encoded region name 2518 * @return region location, wrapped by a {@link CompletableFuture} 2519 */ 2520 CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) { 2521 if (regionNameOrEncodedRegionName == null) { 2522 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2523 } 2524 2525 CompletableFuture<Optional<HRegionLocation>> future; 2526 if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { 2527 String encodedName = Bytes.toString(regionNameOrEncodedRegionName); 2528 if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) { 2529 // old format encodedName, should be meta region 2530 future = connection.registry.getMetaRegionLocations() 2531 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2532 .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst()); 2533 } else { 2534 future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable, 2535 regionNameOrEncodedRegionName); 2536 } 2537 } else { 2538 // Not all regionNameOrEncodedRegionName here is going to be a valid region name, 2539 // it needs to throw out IllegalArgumentException in case tableName is passed in. 2540 RegionInfo regionInfo; 2541 try { 2542 regionInfo = MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName); 2543 } catch (IOException ioe) { 2544 return failedFuture(new IllegalArgumentException(ioe.getMessage())); 2545 } 2546 2547 if (regionInfo.isMetaRegion()) { 2548 future = connection.registry.getMetaRegionLocations() 2549 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2550 .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId()) 2551 .findFirst()); 2552 } else { 2553 future = AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName); 2554 } 2555 } 2556 2557 CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>(); 2558 addListener(future, (location, err) -> { 2559 if (err != null) { 2560 returnedFuture.completeExceptionally(err); 2561 return; 2562 } 2563 if (!location.isPresent() || location.get().getRegion() == null) { 2564 returnedFuture.completeExceptionally( 2565 new UnknownRegionException("Invalid region name or encoded region name: " 2566 + Bytes.toStringBinary(regionNameOrEncodedRegionName))); 2567 } else { 2568 returnedFuture.complete(location.get()); 2569 } 2570 }); 2571 return returnedFuture; 2572 } 2573 2574 /** 2575 * Get the region info for the passed region name. The region name may be a full region name or 2576 * encoded region name. If the region does not found, then it'll throw an UnknownRegionException 2577 * wrapped by a {@link CompletableFuture} 2578 * @return region info, wrapped by a {@link CompletableFuture} 2579 */ 2580 private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) { 2581 if (regionNameOrEncodedRegionName == null) { 2582 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2583 } 2584 2585 if ( 2586 Bytes.equals(regionNameOrEncodedRegionName, 2587 RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) 2588 || Bytes.equals(regionNameOrEncodedRegionName, 2589 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes()) 2590 ) { 2591 return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO); 2592 } 2593 2594 CompletableFuture<RegionInfo> future = new CompletableFuture<>(); 2595 addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> { 2596 if (err != null) { 2597 future.completeExceptionally(err); 2598 } else { 2599 future.complete(location.getRegion()); 2600 } 2601 }); 2602 return future; 2603 } 2604 2605 private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { 2606 if (numRegions < 3) { 2607 throw new IllegalArgumentException("Must create at least three regions"); 2608 } else if (Bytes.compareTo(startKey, endKey) >= 0) { 2609 throw new IllegalArgumentException("Start key must be smaller than end key"); 2610 } 2611 if (numRegions == 3) { 2612 return new byte[][] { startKey, endKey }; 2613 } 2614 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 2615 if (splitKeys == null || splitKeys.length != numRegions - 1) { 2616 throw new IllegalArgumentException("Unable to split key range into enough regions"); 2617 } 2618 return splitKeys; 2619 } 2620 2621 private void verifySplitKeys(byte[][] splitKeys) { 2622 Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR); 2623 // Verify there are no duplicate split keys 2624 byte[] lastKey = null; 2625 for (byte[] splitKey : splitKeys) { 2626 if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) { 2627 throw new IllegalArgumentException("Empty split key must not be passed in the split keys."); 2628 } 2629 if (lastKey != null && Bytes.equals(splitKey, lastKey)) { 2630 throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: " 2631 + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey)); 2632 } 2633 lastKey = splitKey; 2634 } 2635 } 2636 2637 private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> { 2638 2639 abstract void onFinished(); 2640 2641 abstract void onError(Throwable error); 2642 2643 @Override 2644 public void accept(Void v, Throwable error) { 2645 if (error != null) { 2646 onError(error); 2647 return; 2648 } 2649 onFinished(); 2650 } 2651 } 2652 2653 private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer { 2654 protected final TableName tableName; 2655 2656 TableProcedureBiConsumer(TableName tableName) { 2657 this.tableName = tableName; 2658 } 2659 2660 abstract String getOperationType(); 2661 2662 String getDescription() { 2663 return "Operation: " + getOperationType() + ", " + "Table Name: " 2664 + tableName.getNameWithNamespaceInclAsString(); 2665 } 2666 2667 @Override 2668 void onFinished() { 2669 LOG.info(getDescription() + " completed"); 2670 } 2671 2672 @Override 2673 void onError(Throwable error) { 2674 LOG.info(getDescription() + " failed with " + error.getMessage()); 2675 } 2676 } 2677 2678 private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer { 2679 protected final String namespaceName; 2680 2681 NamespaceProcedureBiConsumer(String namespaceName) { 2682 this.namespaceName = namespaceName; 2683 } 2684 2685 abstract String getOperationType(); 2686 2687 String getDescription() { 2688 return "Operation: " + getOperationType() + ", Namespace: " + namespaceName; 2689 } 2690 2691 @Override 2692 void onFinished() { 2693 LOG.info(getDescription() + " completed"); 2694 } 2695 2696 @Override 2697 void onError(Throwable error) { 2698 LOG.info(getDescription() + " failed with " + error.getMessage()); 2699 } 2700 } 2701 2702 private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2703 2704 CreateTableProcedureBiConsumer(TableName tableName) { 2705 super(tableName); 2706 } 2707 2708 @Override 2709 String getOperationType() { 2710 return "CREATE"; 2711 } 2712 } 2713 2714 private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { 2715 2716 ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2717 super(tableName); 2718 } 2719 2720 @Override 2721 String getOperationType() { 2722 return "MODIFY"; 2723 } 2724 } 2725 2726 private static class ModifyTableStoreFileTrackerProcedureBiConsumer 2727 extends TableProcedureBiConsumer { 2728 2729 ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2730 super(tableName); 2731 } 2732 2733 @Override 2734 String getOperationType() { 2735 return "MODIFY_TABLE_STORE_FILE_TRACKER"; 2736 } 2737 } 2738 2739 private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { 2740 2741 DeleteTableProcedureBiConsumer(TableName tableName) { 2742 super(tableName); 2743 } 2744 2745 @Override 2746 String getOperationType() { 2747 return "DELETE"; 2748 } 2749 2750 @Override 2751 void onFinished() { 2752 connection.getLocator().clearCache(this.tableName); 2753 super.onFinished(); 2754 } 2755 } 2756 2757 private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2758 2759 TruncateTableProcedureBiConsumer(TableName tableName) { 2760 super(tableName); 2761 } 2762 2763 @Override 2764 String getOperationType() { 2765 return "TRUNCATE"; 2766 } 2767 } 2768 2769 private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2770 2771 EnableTableProcedureBiConsumer(TableName tableName) { 2772 super(tableName); 2773 } 2774 2775 @Override 2776 String getOperationType() { 2777 return "ENABLE"; 2778 } 2779 } 2780 2781 private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2782 2783 DisableTableProcedureBiConsumer(TableName tableName) { 2784 super(tableName); 2785 } 2786 2787 @Override 2788 String getOperationType() { 2789 return "DISABLE"; 2790 } 2791 } 2792 2793 private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2794 2795 AddColumnFamilyProcedureBiConsumer(TableName tableName) { 2796 super(tableName); 2797 } 2798 2799 @Override 2800 String getOperationType() { 2801 return "ADD_COLUMN_FAMILY"; 2802 } 2803 } 2804 2805 private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2806 2807 DeleteColumnFamilyProcedureBiConsumer(TableName tableName) { 2808 super(tableName); 2809 } 2810 2811 @Override 2812 String getOperationType() { 2813 return "DELETE_COLUMN_FAMILY"; 2814 } 2815 } 2816 2817 private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2818 2819 ModifyColumnFamilyProcedureBiConsumer(TableName tableName) { 2820 super(tableName); 2821 } 2822 2823 @Override 2824 String getOperationType() { 2825 return "MODIFY_COLUMN_FAMILY"; 2826 } 2827 } 2828 2829 private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer 2830 extends TableProcedureBiConsumer { 2831 2832 ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) { 2833 super(tableName); 2834 } 2835 2836 @Override 2837 String getOperationType() { 2838 return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER"; 2839 } 2840 } 2841 2842 private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer { 2843 2844 FlushTableProcedureBiConsumer(TableName tableName) { 2845 super(tableName); 2846 } 2847 2848 @Override 2849 String getOperationType() { 2850 return "FLUSH"; 2851 } 2852 } 2853 2854 private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2855 2856 CreateNamespaceProcedureBiConsumer(String namespaceName) { 2857 super(namespaceName); 2858 } 2859 2860 @Override 2861 String getOperationType() { 2862 return "CREATE_NAMESPACE"; 2863 } 2864 } 2865 2866 private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2867 2868 DeleteNamespaceProcedureBiConsumer(String namespaceName) { 2869 super(namespaceName); 2870 } 2871 2872 @Override 2873 String getOperationType() { 2874 return "DELETE_NAMESPACE"; 2875 } 2876 } 2877 2878 private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2879 2880 ModifyNamespaceProcedureBiConsumer(String namespaceName) { 2881 super(namespaceName); 2882 } 2883 2884 @Override 2885 String getOperationType() { 2886 return "MODIFY_NAMESPACE"; 2887 } 2888 } 2889 2890 private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2891 2892 MergeTableRegionProcedureBiConsumer(TableName tableName) { 2893 super(tableName); 2894 } 2895 2896 @Override 2897 String getOperationType() { 2898 return "MERGE_REGIONS"; 2899 } 2900 } 2901 2902 private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2903 2904 SplitTableRegionProcedureBiConsumer(TableName tableName) { 2905 super(tableName); 2906 } 2907 2908 @Override 2909 String getOperationType() { 2910 return "SPLIT_REGION"; 2911 } 2912 } 2913 2914 private static class TruncateRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2915 2916 TruncateRegionProcedureBiConsumer(TableName tableName) { 2917 super(tableName); 2918 } 2919 2920 @Override 2921 String getOperationType() { 2922 return "TRUNCATE_REGION"; 2923 } 2924 } 2925 2926 private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer { 2927 SnapshotProcedureBiConsumer(TableName tableName) { 2928 super(tableName); 2929 } 2930 2931 @Override 2932 String getOperationType() { 2933 return "SNAPSHOT"; 2934 } 2935 } 2936 2937 private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer { 2938 private final String peerId; 2939 private final Supplier<String> getOperation; 2940 2941 ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) { 2942 this.peerId = peerId; 2943 this.getOperation = getOperation; 2944 } 2945 2946 String getDescription() { 2947 return "Operation: " + getOperation.get() + ", peerId: " + peerId; 2948 } 2949 2950 @Override 2951 void onFinished() { 2952 LOG.info(getDescription() + " completed"); 2953 } 2954 2955 @Override 2956 void onError(Throwable error) { 2957 LOG.info(getDescription() + " failed with " + error.getMessage()); 2958 } 2959 } 2960 2961 private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) { 2962 CompletableFuture<Void> future = new CompletableFuture<>(); 2963 addListener(procFuture, (procId, error) -> { 2964 if (error != null) { 2965 future.completeExceptionally(error); 2966 return; 2967 } 2968 getProcedureResult(procId, future, 0); 2969 }); 2970 return future; 2971 } 2972 2973 private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) { 2974 addListener( 2975 this.<GetProcedureResultResponse> newMasterCaller() 2976 .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse, 2977 GetProcedureResultResponse> call(controller, stub, 2978 GetProcedureResultRequest.newBuilder().setProcId(procId).build(), 2979 (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) 2980 .call(), 2981 (response, error) -> { 2982 if (error != null) { 2983 LOG.warn("failed to get the procedure result procId={}", procId, 2984 ConnectionUtils.translateException(error)); 2985 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 2986 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 2987 return; 2988 } 2989 if (response.getState() == GetProcedureResultResponse.State.RUNNING) { 2990 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 2991 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 2992 return; 2993 } 2994 if (response.hasException()) { 2995 IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); 2996 future.completeExceptionally(ioe); 2997 } else { 2998 future.complete(null); 2999 } 3000 }); 3001 } 3002 3003 private <T> CompletableFuture<T> failedFuture(Throwable error) { 3004 CompletableFuture<T> future = new CompletableFuture<>(); 3005 future.completeExceptionally(error); 3006 return future; 3007 } 3008 3009 private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) { 3010 if (error != null) { 3011 future.completeExceptionally(error); 3012 return true; 3013 } 3014 return false; 3015 } 3016 3017 @Override 3018 public CompletableFuture<ClusterMetrics> getClusterMetrics() { 3019 return getClusterMetrics(EnumSet.allOf(Option.class)); 3020 } 3021 3022 @Override 3023 public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) { 3024 return this.<ClusterMetrics> newMasterCaller() 3025 .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse, 3026 ClusterMetrics> call(controller, stub, 3027 RequestConverter.buildGetClusterStatusRequest(options), 3028 (s, c, req, done) -> s.getClusterStatus(c, req, done), 3029 resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))) 3030 .call(); 3031 } 3032 3033 @Override 3034 public CompletableFuture<Void> shutdown() { 3035 return this.<Void> newMasterCaller().priority(HIGH_QOS) 3036 .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller, 3037 stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done), 3038 resp -> null)) 3039 .call(); 3040 } 3041 3042 @Override 3043 public CompletableFuture<Void> stopMaster() { 3044 return this.<Void> newMasterCaller().priority(HIGH_QOS) 3045 .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call( 3046 controller, stub, StopMasterRequest.newBuilder().build(), 3047 (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)) 3048 .call(); 3049 } 3050 3051 @Override 3052 public CompletableFuture<Void> stopRegionServer(ServerName serverName) { 3053 StopServerRequest request = RequestConverter 3054 .buildStopServerRequest("Called by admin client " + this.connection.toString()); 3055 return this.<Void> newAdminCaller().priority(HIGH_QOS) 3056 .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall( 3057 controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), 3058 resp -> null)) 3059 .serverName(serverName).call(); 3060 } 3061 3062 @Override 3063 public CompletableFuture<Void> updateConfiguration(ServerName serverName) { 3064 return this.<Void> newAdminCaller() 3065 .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse, 3066 Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(), 3067 (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null)) 3068 .serverName(serverName).call(); 3069 } 3070 3071 @Override 3072 public CompletableFuture<Void> updateConfiguration() { 3073 CompletableFuture<Void> future = new CompletableFuture<Void>(); 3074 addListener( 3075 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)), 3076 (status, err) -> { 3077 if (err != null) { 3078 future.completeExceptionally(err); 3079 } else { 3080 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3081 status.getServersName().forEach(server -> futures.add(updateConfiguration(server))); 3082 futures.add(updateConfiguration(status.getMasterName())); 3083 status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master))); 3084 addListener( 3085 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3086 (result, err2) -> { 3087 if (err2 != null) { 3088 future.completeExceptionally(err2); 3089 } else { 3090 future.complete(result); 3091 } 3092 }); 3093 } 3094 }); 3095 return future; 3096 } 3097 3098 @Override 3099 public CompletableFuture<Void> rollWALWriter(ServerName serverName) { 3100 return this.<Void> newAdminCaller() 3101 .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, 3102 Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(), 3103 (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null)) 3104 .serverName(serverName).call(); 3105 } 3106 3107 @Override 3108 public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) { 3109 return this.<Void> newAdminCaller() 3110 .action((controller, stub) -> this.<ClearCompactionQueuesRequest, 3111 ClearCompactionQueuesResponse, Void> adminCall(controller, stub, 3112 RequestConverter.buildClearCompactionQueuesRequest(queues), 3113 (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null)) 3114 .serverName(serverName).call(); 3115 } 3116 3117 @Override 3118 public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() { 3119 return this.<List<SecurityCapability>> newMasterCaller() 3120 .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, 3121 List<SecurityCapability>> call(controller, stub, 3122 SecurityCapabilitiesRequest.newBuilder().build(), 3123 (s, c, req, done) -> s.getSecurityCapabilities(c, req, done), 3124 (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList()))) 3125 .call(); 3126 } 3127 3128 @Override 3129 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) { 3130 return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName); 3131 } 3132 3133 @Override 3134 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName, 3135 TableName tableName) { 3136 Preconditions.checkNotNull(tableName, 3137 "tableName is null. If you don't specify a tableName, use getRegionLoads() instead"); 3138 return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName); 3139 } 3140 3141 private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request, 3142 ServerName serverName) { 3143 return this.<List<RegionMetrics>> newAdminCaller() 3144 .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse, 3145 List<RegionMetrics>> adminCall(controller, stub, request, 3146 (s, c, req, done) -> s.getRegionLoad(controller, req, done), 3147 RegionMetricsBuilder::toRegionMetrics)) 3148 .serverName(serverName).call(); 3149 } 3150 3151 @Override 3152 public CompletableFuture<Boolean> isMasterInMaintenanceMode() { 3153 return this.<Boolean> newMasterCaller() 3154 .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, 3155 Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(), 3156 (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done), 3157 resp -> resp.getInMaintenanceMode())) 3158 .call(); 3159 } 3160 3161 @Override 3162 public CompletableFuture<CompactionState> getCompactionState(TableName tableName, 3163 CompactType compactType) { 3164 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3165 3166 switch (compactType) { 3167 case MOB: 3168 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 3169 if (err != null) { 3170 future.completeExceptionally(err); 3171 return; 3172 } 3173 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 3174 3175 addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName) 3176 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3177 GetRegionInfoResponse> adminCall(controller, stub, 3178 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true), 3179 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3180 .call(), (resp2, err2) -> { 3181 if (err2 != null) { 3182 future.completeExceptionally(err2); 3183 } else { 3184 if (resp2.hasCompactionState()) { 3185 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3186 } else { 3187 future.complete(CompactionState.NONE); 3188 } 3189 } 3190 }); 3191 }); 3192 break; 3193 case NORMAL: 3194 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3195 if (err != null) { 3196 future.completeExceptionally(err); 3197 return; 3198 } 3199 ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>(); 3200 List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); 3201 locations.stream().filter(loc -> loc.getServerName() != null) 3202 .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline()) 3203 .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { 3204 futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { 3205 // If any region compaction state is MAJOR_AND_MINOR 3206 // the table compaction state is MAJOR_AND_MINOR, too. 3207 if (err2 != null) { 3208 future.completeExceptionally(unwrapCompletionException(err2)); 3209 } else if (regionState == CompactionState.MAJOR_AND_MINOR) { 3210 future.complete(regionState); 3211 } else { 3212 regionStates.add(regionState); 3213 } 3214 })); 3215 }); 3216 addListener( 3217 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3218 (ret, err3) -> { 3219 // If future not completed, check all regions's compaction state 3220 if (!future.isCompletedExceptionally() && !future.isDone()) { 3221 CompactionState state = CompactionState.NONE; 3222 for (CompactionState regionState : regionStates) { 3223 switch (regionState) { 3224 case MAJOR: 3225 if (state == CompactionState.MINOR) { 3226 future.complete(CompactionState.MAJOR_AND_MINOR); 3227 } else { 3228 state = CompactionState.MAJOR; 3229 } 3230 break; 3231 case MINOR: 3232 if (state == CompactionState.MAJOR) { 3233 future.complete(CompactionState.MAJOR_AND_MINOR); 3234 } else { 3235 state = CompactionState.MINOR; 3236 } 3237 break; 3238 case NONE: 3239 default: 3240 } 3241 } 3242 if (!future.isDone()) { 3243 future.complete(state); 3244 } 3245 } 3246 }); 3247 }); 3248 break; 3249 default: 3250 throw new IllegalArgumentException("Unknown compactType: " + compactType); 3251 } 3252 3253 return future; 3254 } 3255 3256 @Override 3257 public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) { 3258 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3259 addListener(getRegionLocation(regionName), (location, err) -> { 3260 if (err != null) { 3261 future.completeExceptionally(err); 3262 return; 3263 } 3264 ServerName serverName = location.getServerName(); 3265 if (serverName == null) { 3266 future 3267 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 3268 return; 3269 } 3270 addListener( 3271 this.<GetRegionInfoResponse> newAdminCaller() 3272 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3273 GetRegionInfoResponse> adminCall(controller, stub, 3274 RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(), 3275 true), 3276 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3277 .serverName(serverName).call(), 3278 (resp2, err2) -> { 3279 if (err2 != null) { 3280 future.completeExceptionally(err2); 3281 } else { 3282 if (resp2.hasCompactionState()) { 3283 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3284 } else { 3285 future.complete(CompactionState.NONE); 3286 } 3287 } 3288 }); 3289 }); 3290 return future; 3291 } 3292 3293 @Override 3294 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) { 3295 MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder() 3296 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); 3297 return this.<Optional<Long>> newMasterCaller() 3298 .action((controller, stub) -> this.<MajorCompactionTimestampRequest, 3299 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request, 3300 (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done), 3301 ProtobufUtil::toOptionalTimestamp)) 3302 .call(); 3303 } 3304 3305 @Override 3306 public CompletableFuture<Optional<Long>> 3307 getLastMajorCompactionTimestampForRegion(byte[] regionName) { 3308 CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); 3309 // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first 3310 addListener(getRegionInfo(regionName), (region, err) -> { 3311 if (err != null) { 3312 future.completeExceptionally(err); 3313 return; 3314 } 3315 MajorCompactionTimestampForRegionRequest.Builder builder = 3316 MajorCompactionTimestampForRegionRequest.newBuilder(); 3317 builder.setRegion( 3318 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); 3319 addListener(this.<Optional<Long>> newMasterCaller() 3320 .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest, 3321 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(), 3322 (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done), 3323 ProtobufUtil::toOptionalTimestamp)) 3324 .call(), (timestamp, err2) -> { 3325 if (err2 != null) { 3326 future.completeExceptionally(err2); 3327 } else { 3328 future.complete(timestamp); 3329 } 3330 }); 3331 }); 3332 return future; 3333 } 3334 3335 @Override 3336 public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState, 3337 List<String> serverNamesList) { 3338 CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>(); 3339 addListener(getRegionServerList(serverNamesList), (serverNames, err) -> { 3340 if (err != null) { 3341 future.completeExceptionally(err); 3342 return; 3343 } 3344 // Accessed by multiple threads. 3345 Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size()); 3346 List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size()); 3347 serverNames.stream().forEach(serverName -> { 3348 futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> { 3349 if (err2 != null) { 3350 future.completeExceptionally(unwrapCompletionException(err2)); 3351 } else { 3352 serverStates.put(serverName, serverState); 3353 } 3354 })); 3355 }); 3356 addListener( 3357 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3358 (ret, err3) -> { 3359 if (!future.isCompletedExceptionally()) { 3360 if (err3 != null) { 3361 future.completeExceptionally(err3); 3362 } else { 3363 future.complete(serverStates); 3364 } 3365 } 3366 }); 3367 }); 3368 return future; 3369 } 3370 3371 private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) { 3372 CompletableFuture<List<ServerName>> future = new CompletableFuture<>(); 3373 if (serverNamesList.isEmpty()) { 3374 CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture = 3375 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)); 3376 addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> { 3377 if (err != null) { 3378 future.completeExceptionally(err); 3379 } else { 3380 future.complete(clusterMetrics.getServersName()); 3381 } 3382 }); 3383 return future; 3384 } else { 3385 List<ServerName> serverList = new ArrayList<>(); 3386 for (String regionServerName : serverNamesList) { 3387 ServerName serverName = null; 3388 try { 3389 serverName = ServerName.valueOf(regionServerName); 3390 } catch (Exception e) { 3391 future.completeExceptionally( 3392 new IllegalArgumentException(String.format("ServerName format: %s", regionServerName))); 3393 } 3394 if (serverName == null) { 3395 future.completeExceptionally( 3396 new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName))); 3397 } else { 3398 serverList.add(serverName); 3399 } 3400 } 3401 future.complete(serverList); 3402 } 3403 return future; 3404 } 3405 3406 private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) { 3407 return this.<Boolean> newAdminCaller().serverName(serverName) 3408 .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse, 3409 Boolean> adminCall(controller, stub, 3410 CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), 3411 (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState())) 3412 .call(); 3413 } 3414 3415 @Override 3416 public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) { 3417 return this.<Boolean> newMasterCaller() 3418 .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse, 3419 Boolean> call(controller, stub, 3420 RequestConverter.buildSetBalancerRunningRequest(on, drainRITs), 3421 (s, c, req, done) -> s.setBalancerRunning(c, req, done), 3422 (resp) -> resp.getPrevBalanceValue())) 3423 .call(); 3424 } 3425 3426 @Override 3427 public CompletableFuture<BalanceResponse> balance(BalanceRequest request) { 3428 return this.<BalanceResponse> newMasterCaller() 3429 .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse, 3430 BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request), 3431 (s, c, req, done) -> s.balance(c, req, done), 3432 (resp) -> ProtobufUtil.toBalanceResponse(resp))) 3433 .call(); 3434 } 3435 3436 @Override 3437 public CompletableFuture<Boolean> isBalancerEnabled() { 3438 return this.<Boolean> newMasterCaller() 3439 .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, 3440 Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), 3441 (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3442 .call(); 3443 } 3444 3445 @Override 3446 public CompletableFuture<Boolean> normalizerSwitch(boolean on) { 3447 return this.<Boolean> newMasterCaller() 3448 .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse, 3449 Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), 3450 (s, c, req, done) -> s.setNormalizerRunning(c, req, done), 3451 (resp) -> resp.getPrevNormalizerValue())) 3452 .call(); 3453 } 3454 3455 @Override 3456 public CompletableFuture<Boolean> isNormalizerEnabled() { 3457 return this.<Boolean> newMasterCaller() 3458 .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, 3459 Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(), 3460 (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3461 .call(); 3462 } 3463 3464 @Override 3465 public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) { 3466 return normalize(RequestConverter.buildNormalizeRequest(ntfp)); 3467 } 3468 3469 private CompletableFuture<Boolean> normalize(NormalizeRequest request) { 3470 return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub, 3471 request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call(); 3472 } 3473 3474 @Override 3475 public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) { 3476 return this.<Boolean> newMasterCaller() 3477 .action((controller, stub) -> this.<SetCleanerChoreRunningRequest, 3478 SetCleanerChoreRunningResponse, Boolean> call(controller, stub, 3479 RequestConverter.buildSetCleanerChoreRunningRequest(enabled), 3480 (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done), 3481 (resp) -> resp.getPrevValue())) 3482 .call(); 3483 } 3484 3485 @Override 3486 public CompletableFuture<Boolean> isCleanerChoreEnabled() { 3487 return this.<Boolean> newMasterCaller() 3488 .action( 3489 (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, 3490 Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), 3491 (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue())) 3492 .call(); 3493 } 3494 3495 @Override 3496 public CompletableFuture<Boolean> runCleanerChore() { 3497 return this.<Boolean> newMasterCaller() 3498 .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse, 3499 Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(), 3500 (s, c, req, done) -> s.runCleanerChore(c, req, done), 3501 (resp) -> resp.getCleanerChoreRan())) 3502 .call(); 3503 } 3504 3505 @Override 3506 public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) { 3507 return this.<Boolean> newMasterCaller() 3508 .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, 3509 Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), 3510 (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue())) 3511 .call(); 3512 } 3513 3514 @Override 3515 public CompletableFuture<Boolean> isCatalogJanitorEnabled() { 3516 return this.<Boolean> newMasterCaller() 3517 .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest, 3518 IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub, 3519 RequestConverter.buildIsCatalogJanitorEnabledRequest(), 3520 (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue())) 3521 .call(); 3522 } 3523 3524 @Override 3525 public CompletableFuture<Integer> runCatalogJanitor() { 3526 return this.<Integer> newMasterCaller() 3527 .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, 3528 Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(), 3529 (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult())) 3530 .call(); 3531 } 3532 3533 @Override 3534 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3535 ServiceCaller<S, R> callable) { 3536 MasterCoprocessorRpcChannelImpl channel = 3537 new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller()); 3538 S stub = stubMaker.apply(channel); 3539 CompletableFuture<R> future = new CompletableFuture<>(); 3540 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3541 callable.call(stub, controller, resp -> { 3542 if (controller.failed()) { 3543 future.completeExceptionally(controller.getFailed()); 3544 } else { 3545 future.complete(resp); 3546 } 3547 }); 3548 return future; 3549 } 3550 3551 @Override 3552 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3553 ServiceCaller<S, R> callable, ServerName serverName) { 3554 RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl( 3555 this.<Message> newServerCaller().serverName(serverName)); 3556 S stub = stubMaker.apply(channel); 3557 CompletableFuture<R> future = new CompletableFuture<>(); 3558 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3559 callable.call(stub, controller, resp -> { 3560 if (controller.failed()) { 3561 future.completeExceptionally(controller.getFailed()); 3562 } else { 3563 future.complete(resp); 3564 } 3565 }); 3566 return future; 3567 } 3568 3569 @Override 3570 public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) { 3571 return this.<List<ServerName>> newMasterCaller() 3572 .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse, 3573 List<ServerName>> call(controller, stub, 3574 RequestConverter.buildClearDeadServersRequest(servers), 3575 (s, c, req, done) -> s.clearDeadServers(c, req, done), 3576 (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList()))) 3577 .call(); 3578 } 3579 3580 private <T> ServerRequestCallerBuilder<T> newServerCaller() { 3581 return this.connection.callerFactory.<T> serverRequest() 3582 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 3583 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 3584 .pause(pauseNs, TimeUnit.NANOSECONDS) 3585 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 3586 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 3587 } 3588 3589 @Override 3590 public CompletableFuture<Void> enableTableReplication(TableName tableName) { 3591 if (tableName == null) { 3592 return failedFuture(new IllegalArgumentException("Table name is null")); 3593 } 3594 CompletableFuture<Void> future = new CompletableFuture<>(); 3595 addListener(tableExists(tableName), (exist, err) -> { 3596 if (err != null) { 3597 future.completeExceptionally(err); 3598 return; 3599 } 3600 if (!exist) { 3601 future.completeExceptionally(new TableNotFoundException( 3602 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3603 return; 3604 } 3605 addListener(getTableSplits(tableName), (splits, err1) -> { 3606 if (err1 != null) { 3607 future.completeExceptionally(err1); 3608 } else { 3609 addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> { 3610 if (err2 != null) { 3611 future.completeExceptionally(err2); 3612 } else { 3613 addListener(setTableReplication(tableName, true), (result3, err3) -> { 3614 if (err3 != null) { 3615 future.completeExceptionally(err3); 3616 } else { 3617 future.complete(result3); 3618 } 3619 }); 3620 } 3621 }); 3622 } 3623 }); 3624 }); 3625 return future; 3626 } 3627 3628 @Override 3629 public CompletableFuture<Void> disableTableReplication(TableName tableName) { 3630 if (tableName == null) { 3631 return failedFuture(new IllegalArgumentException("Table name is null")); 3632 } 3633 CompletableFuture<Void> future = new CompletableFuture<>(); 3634 addListener(tableExists(tableName), (exist, err) -> { 3635 if (err != null) { 3636 future.completeExceptionally(err); 3637 return; 3638 } 3639 if (!exist) { 3640 future.completeExceptionally(new TableNotFoundException( 3641 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3642 return; 3643 } 3644 addListener(setTableReplication(tableName, false), (result, err2) -> { 3645 if (err2 != null) { 3646 future.completeExceptionally(err2); 3647 } else { 3648 future.complete(result); 3649 } 3650 }); 3651 }); 3652 return future; 3653 } 3654 3655 @Override 3656 public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) { 3657 GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder(); 3658 request.setPeerId(peerId); 3659 return this.<Boolean> newMasterCaller() 3660 .action((controller, stub) -> this.<GetReplicationPeerStateRequest, 3661 GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(), 3662 (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done), 3663 resp -> resp.getIsEnabled())) 3664 .call(); 3665 } 3666 3667 private CompletableFuture<byte[][]> getTableSplits(TableName tableName) { 3668 CompletableFuture<byte[][]> future = new CompletableFuture<>(); 3669 addListener( 3670 getRegions(tableName).thenApply(regions -> regions.stream() 3671 .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())), 3672 (regions, err2) -> { 3673 if (err2 != null) { 3674 future.completeExceptionally(err2); 3675 return; 3676 } 3677 if (regions.size() == 1) { 3678 future.complete(null); 3679 } else { 3680 byte[][] splits = new byte[regions.size() - 1][]; 3681 for (int i = 1; i < regions.size(); i++) { 3682 splits[i - 1] = regions.get(i).getStartKey(); 3683 } 3684 future.complete(splits); 3685 } 3686 }); 3687 return future; 3688 } 3689 3690 /** 3691 * Connect to peer and check the table descriptor on peer: 3692 * <ol> 3693 * <li>Create the same table on peer when not exist.</li> 3694 * <li>Throw an exception if the table already has replication enabled on any of the column 3695 * families.</li> 3696 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> 3697 * </ol> 3698 * @param tableName name of the table to sync to the peer 3699 * @param splits table split keys 3700 */ 3701 private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName, 3702 byte[][] splits) { 3703 CompletableFuture<Void> future = new CompletableFuture<>(); 3704 addListener(listReplicationPeers(), (peers, err) -> { 3705 if (err != null) { 3706 future.completeExceptionally(err); 3707 return; 3708 } 3709 if (peers == null || peers.size() <= 0) { 3710 future.completeExceptionally( 3711 new IllegalArgumentException("Found no peer cluster for replication.")); 3712 return; 3713 } 3714 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3715 peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName)) 3716 .forEach(peer -> { 3717 futures.add(trySyncTableToPeerCluster(tableName, splits, peer)); 3718 }); 3719 addListener( 3720 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3721 (result, err2) -> { 3722 if (err2 != null) { 3723 future.completeExceptionally(err2); 3724 } else { 3725 future.complete(result); 3726 } 3727 }); 3728 }); 3729 return future; 3730 } 3731 3732 private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits, 3733 ReplicationPeerDescription peer) { 3734 Configuration peerConf = null; 3735 try { 3736 peerConf = 3737 ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer); 3738 } catch (IOException e) { 3739 return failedFuture(e); 3740 } 3741 CompletableFuture<Void> future = new CompletableFuture<>(); 3742 addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> { 3743 if (err != null) { 3744 future.completeExceptionally(err); 3745 return; 3746 } 3747 addListener(getDescriptor(tableName), (tableDesc, err1) -> { 3748 if (err1 != null) { 3749 future.completeExceptionally(err1); 3750 return; 3751 } 3752 AsyncAdmin peerAdmin = conn.getAdmin(); 3753 addListener(peerAdmin.tableExists(tableName), (exist, err2) -> { 3754 if (err2 != null) { 3755 future.completeExceptionally(err2); 3756 return; 3757 } 3758 if (!exist) { 3759 CompletableFuture<Void> createTableFuture = null; 3760 if (splits == null) { 3761 createTableFuture = peerAdmin.createTable(tableDesc); 3762 } else { 3763 createTableFuture = peerAdmin.createTable(tableDesc, splits); 3764 } 3765 addListener(createTableFuture, (result, err3) -> { 3766 if (err3 != null) { 3767 future.completeExceptionally(err3); 3768 } else { 3769 future.complete(result); 3770 } 3771 }); 3772 } else { 3773 addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin), 3774 (result, err4) -> { 3775 if (err4 != null) { 3776 future.completeExceptionally(err4); 3777 } else { 3778 future.complete(result); 3779 } 3780 }); 3781 } 3782 }); 3783 }); 3784 }); 3785 return future; 3786 } 3787 3788 private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName, 3789 TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) { 3790 CompletableFuture<Void> future = new CompletableFuture<>(); 3791 addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> { 3792 if (err != null) { 3793 future.completeExceptionally(err); 3794 return; 3795 } 3796 if (peerTableDesc == null) { 3797 future.completeExceptionally( 3798 new IllegalArgumentException("Failed to get table descriptor for table " 3799 + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId())); 3800 return; 3801 } 3802 if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) { 3803 future.completeExceptionally(new IllegalArgumentException( 3804 "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() 3805 + ", but the table descriptors are not same when compared with source cluster." 3806 + " Thus can not enable the table's replication switch.")); 3807 return; 3808 } 3809 future.complete(null); 3810 }); 3811 return future; 3812 } 3813 3814 /** 3815 * Set the table's replication switch if the table's replication switch is already not set. 3816 * @param tableName name of the table 3817 * @param enableRep is replication switch enable or disable 3818 */ 3819 private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) { 3820 CompletableFuture<Void> future = new CompletableFuture<>(); 3821 addListener(getDescriptor(tableName), (tableDesc, err) -> { 3822 if (err != null) { 3823 future.completeExceptionally(err); 3824 return; 3825 } 3826 if (!tableDesc.matchReplicationScope(enableRep)) { 3827 int scope = 3828 enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; 3829 TableDescriptor newTableDesc = 3830 TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build(); 3831 addListener(modifyTable(newTableDesc), (result, err2) -> { 3832 if (err2 != null) { 3833 future.completeExceptionally(err2); 3834 } else { 3835 future.complete(result); 3836 } 3837 }); 3838 } else { 3839 future.complete(null); 3840 } 3841 }); 3842 return future; 3843 } 3844 3845 private void waitUntilAllReplicationPeerModificationProceduresDone( 3846 CompletableFuture<Boolean> future, boolean prevOn, int retries) { 3847 CompletableFuture<List<ProcedureProtos.Procedure>> callFuture = 3848 this.<List<ProcedureProtos.Procedure>> newMasterCaller() 3849 .action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest, 3850 GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call( 3851 controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(), 3852 (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done), 3853 resp -> resp.getProcedureList())) 3854 .call(); 3855 addListener(callFuture, (r, e) -> { 3856 if (e != null) { 3857 future.completeExceptionally(e); 3858 } else if (r.isEmpty()) { 3859 // we are done 3860 future.complete(prevOn); 3861 } else { 3862 // retry later to see if the procedures are done 3863 retryTimer.newTimeout( 3864 t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1), 3865 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 3866 } 3867 }); 3868 } 3869 3870 @Override 3871 public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on, 3872 boolean drainProcedures) { 3873 ReplicationPeerModificationSwitchRequest request = 3874 ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build(); 3875 CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller() 3876 .action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest, 3877 ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request, 3878 (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done), 3879 resp -> resp.getPreviousValue())) 3880 .call(); 3881 // if we do not need to wait all previous peer modification procedure done, or we are enabling 3882 // peer modification, just return here. 3883 if (!drainProcedures || on) { 3884 return callFuture; 3885 } 3886 // otherwise we need to wait until all previous peer modification procedure done 3887 CompletableFuture<Boolean> future = new CompletableFuture<>(); 3888 addListener(callFuture, (prevOn, err) -> { 3889 if (err != null) { 3890 future.completeExceptionally(err); 3891 return; 3892 } 3893 // even if the previous state is disabled, we still need to wait here, as there could be 3894 // another client thread which called this method just before us and have already changed the 3895 // state to off, but there are still peer modification procedures not finished, so we should 3896 // also wait here. 3897 waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0); 3898 }); 3899 return future; 3900 } 3901 3902 @Override 3903 public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() { 3904 return this.<Boolean> newMasterCaller() 3905 .action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest, 3906 IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub, 3907 IsReplicationPeerModificationEnabledRequest.getDefaultInstance(), 3908 (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done), 3909 (resp) -> resp.getEnabled())) 3910 .call(); 3911 } 3912 3913 @Override 3914 public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) { 3915 CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>(); 3916 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3917 if (err != null) { 3918 future.completeExceptionally(err); 3919 return; 3920 } 3921 Map<ServerName, List<RegionInfo>> regionInfoByServerName = 3922 locations.stream().filter(l -> l.getRegion() != null) 3923 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 3924 .collect(Collectors.groupingBy(l -> l.getServerName(), 3925 Collectors.mapping(l -> l.getRegion(), Collectors.toList()))); 3926 List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>(); 3927 CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator(); 3928 for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) { 3929 futures 3930 .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> { 3931 if (err2 != null) { 3932 future.completeExceptionally(unwrapCompletionException(err2)); 3933 } else { 3934 aggregator.append(stats); 3935 } 3936 })); 3937 } 3938 addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 3939 (ret, err3) -> { 3940 if (err3 != null) { 3941 future.completeExceptionally(unwrapCompletionException(err3)); 3942 } else { 3943 future.complete(aggregator.sum()); 3944 } 3945 }); 3946 }); 3947 return future; 3948 } 3949 3950 @Override 3951 public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName, 3952 boolean preserveSplits) { 3953 CompletableFuture<Void> future = new CompletableFuture<>(); 3954 addListener(tableExists(tableName), (exist, err) -> { 3955 if (err != null) { 3956 future.completeExceptionally(err); 3957 return; 3958 } 3959 if (!exist) { 3960 future.completeExceptionally(new TableNotFoundException(tableName)); 3961 return; 3962 } 3963 addListener(tableExists(newTableName), (exist1, err1) -> { 3964 if (err1 != null) { 3965 future.completeExceptionally(err1); 3966 return; 3967 } 3968 if (exist1) { 3969 future.completeExceptionally(new TableExistsException(newTableName)); 3970 return; 3971 } 3972 addListener(getDescriptor(tableName), (tableDesc, err2) -> { 3973 if (err2 != null) { 3974 future.completeExceptionally(err2); 3975 return; 3976 } 3977 TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc); 3978 if (preserveSplits) { 3979 addListener(getTableSplits(tableName), (splits, err3) -> { 3980 if (err3 != null) { 3981 future.completeExceptionally(err3); 3982 } else { 3983 addListener( 3984 splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc), 3985 (result, err4) -> { 3986 if (err4 != null) { 3987 future.completeExceptionally(err4); 3988 } else { 3989 future.complete(result); 3990 } 3991 }); 3992 } 3993 }); 3994 } else { 3995 addListener(createTable(newTableDesc), (result, err5) -> { 3996 if (err5 != null) { 3997 future.completeExceptionally(err5); 3998 } else { 3999 future.complete(result); 4000 } 4001 }); 4002 } 4003 }); 4004 }); 4005 }); 4006 return future; 4007 } 4008 4009 private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName, 4010 List<RegionInfo> hris) { 4011 return this.<CacheEvictionStats> newAdminCaller() 4012 .action((controller, stub) -> this.<ClearRegionBlockCacheRequest, 4013 ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub, 4014 RequestConverter.buildClearRegionBlockCacheRequest(hris), 4015 (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done), 4016 resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats()))) 4017 .serverName(serverName).call(); 4018 } 4019 4020 @Override 4021 public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) { 4022 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4023 .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, 4024 Boolean> call(controller, stub, 4025 SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(), 4026 (s, c, req, done) -> s.switchRpcThrottle(c, req, done), 4027 resp -> resp.getPreviousRpcThrottleEnabled())) 4028 .call(); 4029 return future; 4030 } 4031 4032 @Override 4033 public CompletableFuture<Boolean> isRpcThrottleEnabled() { 4034 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4035 .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, 4036 Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(), 4037 (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done), 4038 resp -> resp.getRpcThrottleEnabled())) 4039 .call(); 4040 return future; 4041 } 4042 4043 @Override 4044 public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) { 4045 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4046 .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest, 4047 SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub, 4048 SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable) 4049 .build(), 4050 (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done), 4051 resp -> resp.getPreviousExceedThrottleQuotaEnabled())) 4052 .call(); 4053 return future; 4054 } 4055 4056 @Override 4057 public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() { 4058 return this.<Map<TableName, Long>> newMasterCaller() 4059 .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest, 4060 GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub, 4061 RequestConverter.buildGetSpaceQuotaRegionSizesRequest(), 4062 (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done), 4063 resp -> resp.getSizesList().stream().collect(Collectors 4064 .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize)))) 4065 .call(); 4066 } 4067 4068 @Override 4069 public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> 4070 getRegionServerSpaceQuotaSnapshots(ServerName serverName) { 4071 return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller() 4072 .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest, 4073 GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, 4074 stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(), 4075 (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done), 4076 resp -> resp.getSnapshotsList().stream() 4077 .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()), 4078 snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot()))))) 4079 .serverName(serverName).call(); 4080 } 4081 4082 private CompletableFuture<SpaceQuotaSnapshot> 4083 getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) { 4084 return this.<SpaceQuotaSnapshot> newMasterCaller() 4085 .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse, 4086 SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(), 4087 (s, c, req, done) -> s.getQuotaStates(c, req, done), converter)) 4088 .call(); 4089 } 4090 4091 @Override 4092 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) { 4093 return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream() 4094 .filter(s -> s.getNamespace().equals(namespace)).findFirst() 4095 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 4096 } 4097 4098 @Override 4099 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) { 4100 HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName); 4101 return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream() 4102 .filter(s -> s.getTableName().equals(protoTableName)).findFirst() 4103 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 4104 } 4105 4106 @Override 4107 public CompletableFuture<Void> grant(UserPermission userPermission, 4108 boolean mergeExistingPermissions) { 4109 return this.<Void> newMasterCaller() 4110 .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub, 4111 ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions), 4112 (s, c, req, done) -> s.grant(c, req, done), resp -> null)) 4113 .call(); 4114 } 4115 4116 @Override 4117 public CompletableFuture<Void> revoke(UserPermission userPermission) { 4118 return this.<Void> newMasterCaller() 4119 .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller, 4120 stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission), 4121 (s, c, req, done) -> s.revoke(c, req, done), resp -> null)) 4122 .call(); 4123 } 4124 4125 @Override 4126 public CompletableFuture<List<UserPermission>> 4127 getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) { 4128 return this.<List<UserPermission>> newMasterCaller() 4129 .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, 4130 GetUserPermissionsResponse, List<UserPermission>> call(controller, stub, 4131 ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest), 4132 (s, c, req, done) -> s.getUserPermissions(c, req, done), 4133 resp -> resp.getUserPermissionList().stream() 4134 .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm)) 4135 .collect(Collectors.toList()))) 4136 .call(); 4137 } 4138 4139 @Override 4140 public CompletableFuture<List<Boolean>> hasUserPermissions(String userName, 4141 List<Permission> permissions) { 4142 return this.<List<Boolean>> newMasterCaller() 4143 .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse, 4144 List<Boolean>> call(controller, stub, 4145 ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions), 4146 (s, c, req, done) -> s.hasUserPermissions(c, req, done), 4147 resp -> resp.getHasUserPermissionList())) 4148 .call(); 4149 } 4150 4151 @Override 4152 public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) { 4153 return this.<Boolean> newMasterCaller() 4154 .action((controller, stub) -> this.call(controller, stub, 4155 RequestConverter.buildSetSnapshotCleanupRequest(on, sync), 4156 MasterService.Interface::switchSnapshotCleanup, 4157 SetSnapshotCleanupResponse::getPrevSnapshotCleanup)) 4158 .call(); 4159 } 4160 4161 @Override 4162 public CompletableFuture<Boolean> isSnapshotCleanupEnabled() { 4163 return this.<Boolean> newMasterCaller() 4164 .action((controller, stub) -> this.call(controller, stub, 4165 RequestConverter.buildIsSnapshotCleanupEnabledRequest(), 4166 MasterService.Interface::isSnapshotCleanupEnabled, 4167 IsSnapshotCleanupEnabledResponse::getEnabled)) 4168 .call(); 4169 } 4170 4171 private CompletableFuture<List<LogEntry>> getSlowLogResponses( 4172 final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit, 4173 final String logType) { 4174 if (CollectionUtils.isEmpty(serverNames)) { 4175 return CompletableFuture.completedFuture(Collections.emptyList()); 4176 } 4177 return CompletableFuture.supplyAsync(() -> serverNames 4178 .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName, 4179 filterParams, limit, logType)) 4180 .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList())); 4181 } 4182 4183 private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName, 4184 Map<String, Object> filterParams, int limit, String logType) { 4185 return this.<List<LogEntry>> newAdminCaller() 4186 .action((controller, stub) -> this.adminCall(controller, stub, 4187 RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType), 4188 AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads)) 4189 .serverName(serverName).call(); 4190 } 4191 4192 @Override 4193 public CompletableFuture<List<Boolean>> 4194 clearSlowLogResponses(@Nullable Set<ServerName> serverNames) { 4195 if (CollectionUtils.isEmpty(serverNames)) { 4196 return CompletableFuture.completedFuture(Collections.emptyList()); 4197 } 4198 List<CompletableFuture<Boolean>> clearSlowLogResponseList = 4199 serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList()); 4200 return convertToFutureOfList(clearSlowLogResponseList); 4201 } 4202 4203 private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) { 4204 return this.<Boolean> newAdminCaller() 4205 .action((controller, stub) -> this.adminCall(controller, stub, 4206 RequestConverter.buildClearSlowLogResponseRequest(), 4207 AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload)) 4208 .serverName(serverName).call(); 4209 } 4210 4211 private static <T> CompletableFuture<List<T>> 4212 convertToFutureOfList(List<CompletableFuture<T>> futures) { 4213 CompletableFuture<Void> allDoneFuture = 4214 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); 4215 return allDoneFuture 4216 .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())); 4217 } 4218 4219 private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) { 4220 return this.<List<LogEntry>> newMasterCaller() 4221 .action((controller, stub) -> this.call(controller, stub, 4222 ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries, 4223 ProtobufUtil::toBalancerDecisionResponse)) 4224 .call(); 4225 } 4226 4227 private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) { 4228 return this.<List<LogEntry>> newMasterCaller() 4229 .action((controller, stub) -> this.call(controller, stub, 4230 ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries, 4231 ProtobufUtil::toBalancerRejectionResponse)) 4232 .call(); 4233 } 4234 4235 @Override 4236 public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, 4237 String logType, ServerType serverType, int limit, Map<String, Object> filterParams) { 4238 if (logType == null || serverType == null) { 4239 throw new IllegalArgumentException("logType and/or serverType cannot be empty"); 4240 } 4241 switch (logType) { 4242 case "SLOW_LOG": 4243 case "LARGE_LOG": 4244 if (ServerType.MASTER.equals(serverType)) { 4245 throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster"); 4246 } 4247 return getSlowLogResponses(filterParams, serverNames, limit, logType); 4248 case "BALANCER_DECISION": 4249 if (ServerType.REGION_SERVER.equals(serverType)) { 4250 throw new IllegalArgumentException( 4251 "Balancer Decision logs are not maintained by HRegionServer"); 4252 } 4253 return getBalancerDecisions(limit); 4254 case "BALANCER_REJECTION": 4255 if (ServerType.REGION_SERVER.equals(serverType)) { 4256 throw new IllegalArgumentException( 4257 "Balancer Rejection logs are not maintained by HRegionServer"); 4258 } 4259 return getBalancerRejections(limit); 4260 default: 4261 return CompletableFuture.completedFuture(Collections.emptyList()); 4262 } 4263 } 4264 4265 @Override 4266 public CompletableFuture<Void> flushMasterStore() { 4267 FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder(); 4268 return this.<Void> newMasterCaller() 4269 .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse, 4270 Void> call(controller, stub, request.build(), 4271 (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null)) 4272 .call(); 4273 } 4274 4275 @Override 4276 public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) { 4277 GetCachedFilesListRequest.Builder request = GetCachedFilesListRequest.newBuilder(); 4278 return this.<List<String>> newAdminCaller() 4279 .action((controller, stub) -> this.<GetCachedFilesListRequest, GetCachedFilesListResponse, 4280 List<String>> adminCall(controller, stub, request.build(), 4281 (s, c, req, done) -> s.getCachedFilesList(c, req, done), 4282 resp -> resp.getCachedFilesList())) 4283 .serverName(serverName).call(); 4284 } 4285}