001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rsgroup;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Optional;
027import java.util.Set;
028import java.util.stream.Collectors;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.BalanceRequest;
032import org.apache.hadoop.hbase.client.BalanceResponse;
033import org.apache.hadoop.hbase.client.TableDescriptor;
034import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
035import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
036import org.apache.hadoop.hbase.master.MasterServices;
037import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
038import org.apache.hadoop.hbase.net.Address;
039import org.apache.hadoop.hbase.procedure2.Procedure;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
044import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
045import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
046
047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersAndTablesRequest;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersAndTablesResponse;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveTablesRequest;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveTablesResponse;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse;
074
075/**
076 * Implementation of RSGroupAdminService defined in RSGroupAdmin.proto. This class calls
077 * {@link RSGroupInfoManagerImpl} for actual work, converts result to protocol buffer response,
078 * handles exceptions if any occurred and then calls the {@code RpcCallback} with the response.
079 * @deprecated Keep it here only for compatibility with {@link RSGroupAdminClient}, using
080 *             {@link org.apache.hadoop.hbase.master.MasterRpcServices} instead.
081 */
082@Deprecated
083class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService {
084
085  private static final Logger LOG = LoggerFactory.getLogger(RSGroupAdminServiceImpl.class);
086
087  private MasterServices master;
088
089  private RSGroupInfoManager rsGroupInfoManager;
090
091  RSGroupAdminServiceImpl() {
092  }
093
094  void initialize(MasterServices masterServices) {
095    this.master = masterServices;
096    this.rsGroupInfoManager = masterServices.getRSGroupInfoManager();
097  }
098
099  // for backward compatible
100  private RSGroupInfo fillTables(RSGroupInfo rsGroupInfo) throws IOException {
101    return RSGroupUtil.fillTables(rsGroupInfo, master.getTableDescriptors().getAll().values());
102  }
103
104  @Override
105  public void getRSGroupInfo(RpcController controller, GetRSGroupInfoRequest request,
106    RpcCallback<GetRSGroupInfoResponse> done) {
107    GetRSGroupInfoResponse.Builder builder = GetRSGroupInfoResponse.newBuilder();
108    String groupName = request.getRSGroupName();
109    LOG.info(
110      master.getClientIdAuditPrefix() + " initiates rsgroup info retrieval, group=" + groupName);
111    try {
112      if (master.getMasterCoprocessorHost() != null) {
113        master.getMasterCoprocessorHost().preGetRSGroupInfo(groupName);
114      }
115      RSGroupInfo rsGroupInfo = rsGroupInfoManager.getRSGroup(groupName);
116      if (rsGroupInfo != null) {
117        builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(fillTables(rsGroupInfo)));
118      }
119      if (master.getMasterCoprocessorHost() != null) {
120        master.getMasterCoprocessorHost().postGetRSGroupInfo(groupName);
121      }
122    } catch (IOException e) {
123      CoprocessorRpcUtils.setControllerException(controller, e);
124    }
125    done.run(builder.build());
126  }
127
128  @Override
129  public void getRSGroupInfoOfTable(RpcController controller, GetRSGroupInfoOfTableRequest request,
130    RpcCallback<GetRSGroupInfoOfTableResponse> done) {
131    GetRSGroupInfoOfTableResponse.Builder builder = GetRSGroupInfoOfTableResponse.newBuilder();
132    TableName tableName = ProtobufUtil.toTableName(request.getTableName());
133    LOG.info(
134      master.getClientIdAuditPrefix() + " initiates rsgroup info retrieval, table=" + tableName);
135    try {
136      if (master.getMasterCoprocessorHost() != null) {
137        master.getMasterCoprocessorHost().preGetRSGroupInfoOfTable(tableName);
138      }
139      Optional<RSGroupInfo> optGroup =
140        RSGroupUtil.getRSGroupInfo(master, rsGroupInfoManager, tableName);
141      if (optGroup.isPresent()) {
142        builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(fillTables(optGroup.get())));
143      } else {
144        if (master.getTableStateManager().isTablePresent(tableName)) {
145          RSGroupInfo rsGroupInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
146          builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(fillTables(rsGroupInfo)));
147        }
148      }
149
150      if (master.getMasterCoprocessorHost() != null) {
151        master.getMasterCoprocessorHost().postGetRSGroupInfoOfTable(tableName);
152      }
153    } catch (IOException e) {
154      CoprocessorRpcUtils.setControllerException(controller, e);
155    }
156    done.run(builder.build());
157  }
158
159  @Override
160  public void moveServers(RpcController controller, MoveServersRequest request,
161    RpcCallback<MoveServersResponse> done) {
162    MoveServersResponse.Builder builder = MoveServersResponse.newBuilder();
163    Set<Address> hostPorts = Sets.newHashSet();
164    for (HBaseProtos.ServerName el : request.getServersList()) {
165      hostPorts.add(Address.fromParts(el.getHostName(), el.getPort()));
166    }
167    LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts + " to rsgroup "
168      + request.getTargetGroup());
169    try {
170      if (master.getMasterCoprocessorHost() != null) {
171        master.getMasterCoprocessorHost().preMoveServers(hostPorts, request.getTargetGroup());
172      }
173      rsGroupInfoManager.moveServers(hostPorts, request.getTargetGroup());
174      if (master.getMasterCoprocessorHost() != null) {
175        master.getMasterCoprocessorHost().postMoveServers(hostPorts, request.getTargetGroup());
176      }
177    } catch (IOException e) {
178      CoprocessorRpcUtils.setControllerException(controller, e);
179    }
180    done.run(builder.build());
181  }
182
183  private void moveTablesAndWait(Set<TableName> tables, String targetGroup) throws IOException {
184    List<Long> procIds = new ArrayList<Long>();
185    for (TableName tableName : tables) {
186      TableDescriptor oldTd = master.getTableDescriptors().get(tableName);
187      if (oldTd == null) {
188        continue;
189      }
190      TableDescriptor newTd =
191        TableDescriptorBuilder.newBuilder(oldTd).setRegionServerGroup(targetGroup).build();
192      procIds.add(master.modifyTable(tableName, newTd, HConstants.NO_NONCE, HConstants.NO_NONCE));
193    }
194    for (long procId : procIds) {
195      Procedure<?> proc = master.getMasterProcedureExecutor().getProcedure(procId);
196      if (proc == null) {
197        continue;
198      }
199      ProcedureSyncWait.waitForProcedureToCompleteIOE(master.getMasterProcedureExecutor(), proc,
200        Long.MAX_VALUE);
201    }
202  }
203
204  @Override
205  public void moveTables(RpcController controller, MoveTablesRequest request,
206    RpcCallback<MoveTablesResponse> done) {
207    MoveTablesResponse.Builder builder = MoveTablesResponse.newBuilder();
208    Set<TableName> tables = new HashSet<>(request.getTableNameList().size());
209    for (HBaseProtos.TableName tableName : request.getTableNameList()) {
210      tables.add(ProtobufUtil.toTableName(tableName));
211    }
212    LOG.info(master.getClientIdAuditPrefix() + " move tables " + tables + " to rsgroup "
213      + request.getTargetGroup());
214    try {
215      if (master.getMasterCoprocessorHost() != null) {
216        master.getMasterCoprocessorHost().preMoveTables(tables, request.getTargetGroup());
217      }
218      moveTablesAndWait(tables, request.getTargetGroup());
219      if (master.getMasterCoprocessorHost() != null) {
220        master.getMasterCoprocessorHost().postMoveTables(tables, request.getTargetGroup());
221      }
222    } catch (IOException e) {
223      CoprocessorRpcUtils.setControllerException(controller, e);
224    }
225    done.run(builder.build());
226  }
227
228  @Override
229  public void addRSGroup(RpcController controller, AddRSGroupRequest request,
230    RpcCallback<AddRSGroupResponse> done) {
231    AddRSGroupResponse.Builder builder = AddRSGroupResponse.newBuilder();
232    LOG.info(master.getClientIdAuditPrefix() + " add rsgroup " + request.getRSGroupName());
233    try {
234      if (master.getMasterCoprocessorHost() != null) {
235        master.getMasterCoprocessorHost().preAddRSGroup(request.getRSGroupName());
236      }
237      rsGroupInfoManager.addRSGroup(new RSGroupInfo(request.getRSGroupName()));
238      if (master.getMasterCoprocessorHost() != null) {
239        master.getMasterCoprocessorHost().postAddRSGroup(request.getRSGroupName());
240      }
241    } catch (IOException e) {
242      CoprocessorRpcUtils.setControllerException(controller, e);
243    }
244    done.run(builder.build());
245  }
246
247  @Override
248  public void removeRSGroup(RpcController controller, RemoveRSGroupRequest request,
249    RpcCallback<RemoveRSGroupResponse> done) {
250    RemoveRSGroupResponse.Builder builder = RemoveRSGroupResponse.newBuilder();
251    LOG.info(master.getClientIdAuditPrefix() + " remove rsgroup " + request.getRSGroupName());
252    try {
253      if (master.getMasterCoprocessorHost() != null) {
254        master.getMasterCoprocessorHost().preRemoveRSGroup(request.getRSGroupName());
255      }
256      rsGroupInfoManager.removeRSGroup(request.getRSGroupName());
257      if (master.getMasterCoprocessorHost() != null) {
258        master.getMasterCoprocessorHost().postRemoveRSGroup(request.getRSGroupName());
259      }
260    } catch (IOException e) {
261      CoprocessorRpcUtils.setControllerException(controller, e);
262    }
263    done.run(builder.build());
264  }
265
266  @Override
267  public void balanceRSGroup(RpcController controller, BalanceRSGroupRequest request,
268    RpcCallback<BalanceRSGroupResponse> done) {
269    BalanceRequest balanceRequest = ProtobufUtil.toBalanceRequest(request);
270    BalanceRSGroupResponse.Builder builder =
271      BalanceRSGroupResponse.newBuilder().setBalanceRan(false);
272
273    LOG.info(
274      master.getClientIdAuditPrefix() + " balance rsgroup, group=" + request.getRSGroupName());
275    try {
276      if (master.getMasterCoprocessorHost() != null) {
277        master.getMasterCoprocessorHost().preBalanceRSGroup(request.getRSGroupName(),
278          balanceRequest);
279      }
280
281      BalanceResponse response =
282        rsGroupInfoManager.balanceRSGroup(request.getRSGroupName(), balanceRequest);
283      ProtobufUtil.populateBalanceRSGroupResponse(builder, response);
284
285      if (master.getMasterCoprocessorHost() != null) {
286        master.getMasterCoprocessorHost().postBalanceRSGroup(request.getRSGroupName(),
287          balanceRequest, response);
288      }
289    } catch (IOException e) {
290      CoprocessorRpcUtils.setControllerException(controller, e);
291    }
292    done.run(builder.build());
293  }
294
295  @Override
296  public void listRSGroupInfos(RpcController controller, ListRSGroupInfosRequest request,
297    RpcCallback<ListRSGroupInfosResponse> done) {
298    ListRSGroupInfosResponse.Builder builder = ListRSGroupInfosResponse.newBuilder();
299    LOG.info(master.getClientIdAuditPrefix() + " list rsgroup");
300    try {
301      if (master.getMasterCoprocessorHost() != null) {
302        master.getMasterCoprocessorHost().preListRSGroups();
303      }
304      List<RSGroupInfo> rsGroupInfos = rsGroupInfoManager.listRSGroups().stream()
305        .map(RSGroupInfo::new).collect(Collectors.toList());
306      Map<String, RSGroupInfo> name2Info = new HashMap<>();
307      for (RSGroupInfo rsGroupInfo : rsGroupInfos) {
308        name2Info.put(rsGroupInfo.getName(), rsGroupInfo);
309      }
310      for (TableDescriptor td : master.getTableDescriptors().getAll().values()) {
311        String groupName = td.getRegionServerGroup().orElse(RSGroupInfo.DEFAULT_GROUP);
312        RSGroupInfo rsGroupInfo = name2Info.get(groupName);
313        if (rsGroupInfo != null) {
314          rsGroupInfo.addTable(td.getTableName());
315        }
316      }
317      for (RSGroupInfo rsGroupInfo : rsGroupInfos) {
318        // TODO: this can be done at once outside this loop, do not need to scan all every time.
319        builder.addRSGroupInfo(ProtobufUtil.toProtoGroupInfo(rsGroupInfo));
320      }
321      if (master.getMasterCoprocessorHost() != null) {
322        master.getMasterCoprocessorHost().postListRSGroups();
323      }
324    } catch (IOException e) {
325      CoprocessorRpcUtils.setControllerException(controller, e);
326    }
327    done.run(builder.build());
328  }
329
330  @Override
331  public void getRSGroupInfoOfServer(RpcController controller,
332    GetRSGroupInfoOfServerRequest request, RpcCallback<GetRSGroupInfoOfServerResponse> done) {
333    GetRSGroupInfoOfServerResponse.Builder builder = GetRSGroupInfoOfServerResponse.newBuilder();
334    Address hp =
335      Address.fromParts(request.getServer().getHostName(), request.getServer().getPort());
336    LOG.info(master.getClientIdAuditPrefix() + " initiates rsgroup info retrieval, server=" + hp);
337    try {
338      if (master.getMasterCoprocessorHost() != null) {
339        master.getMasterCoprocessorHost().preGetRSGroupInfoOfServer(hp);
340      }
341      RSGroupInfo info = rsGroupInfoManager.getRSGroupOfServer(hp);
342      if (info != null) {
343        builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(fillTables(info)));
344      }
345      if (master.getMasterCoprocessorHost() != null) {
346        master.getMasterCoprocessorHost().postGetRSGroupInfoOfServer(hp);
347      }
348    } catch (IOException e) {
349      CoprocessorRpcUtils.setControllerException(controller, e);
350    }
351    done.run(builder.build());
352  }
353
354  @Override
355  public void moveServersAndTables(RpcController controller, MoveServersAndTablesRequest request,
356    RpcCallback<MoveServersAndTablesResponse> done) {
357    MoveServersAndTablesResponse.Builder builder = MoveServersAndTablesResponse.newBuilder();
358    Set<Address> hostPorts = Sets.newHashSet();
359    for (HBaseProtos.ServerName el : request.getServersList()) {
360      hostPorts.add(Address.fromParts(el.getHostName(), el.getPort()));
361    }
362    Set<TableName> tables = new HashSet<>(request.getTableNameList().size());
363    for (HBaseProtos.TableName tableName : request.getTableNameList()) {
364      tables.add(ProtobufUtil.toTableName(tableName));
365    }
366    LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts + " and tables "
367      + tables + " to rsgroup" + request.getTargetGroup());
368    try {
369      if (master.getMasterCoprocessorHost() != null) {
370        master.getMasterCoprocessorHost().preMoveServersAndTables(hostPorts, tables,
371          request.getTargetGroup());
372      }
373      rsGroupInfoManager.moveServers(hostPorts, request.getTargetGroup());
374      moveTablesAndWait(tables, request.getTargetGroup());
375      if (master.getMasterCoprocessorHost() != null) {
376        master.getMasterCoprocessorHost().postMoveServersAndTables(hostPorts, tables,
377          request.getTargetGroup());
378      }
379    } catch (IOException e) {
380      CoprocessorRpcUtils.setControllerException(controller, e);
381    }
382    done.run(builder.build());
383  }
384
385  @Override
386  public void removeServers(RpcController controller, RemoveServersRequest request,
387    RpcCallback<RemoveServersResponse> done) {
388    RemoveServersResponse.Builder builder = RemoveServersResponse.newBuilder();
389    Set<Address> servers = Sets.newHashSet();
390    for (HBaseProtos.ServerName el : request.getServersList()) {
391      servers.add(Address.fromParts(el.getHostName(), el.getPort()));
392    }
393    LOG.info(
394      master.getClientIdAuditPrefix() + " remove decommissioned servers from rsgroup: " + servers);
395    try {
396      if (master.getMasterCoprocessorHost() != null) {
397        master.getMasterCoprocessorHost().preRemoveServers(servers);
398      }
399      rsGroupInfoManager.removeServers(servers);
400      if (master.getMasterCoprocessorHost() != null) {
401        master.getMasterCoprocessorHost().postRemoveServers(servers);
402      }
403    } catch (IOException e) {
404      CoprocessorRpcUtils.setControllerException(controller, e);
405    }
406    done.run(builder.build());
407  }
408
409  @Override
410  public void renameRSGroup(RpcController controller, RenameRSGroupRequest request,
411    RpcCallback<RenameRSGroupResponse> done) {
412    String oldRSGroup = request.getOldRsgroupName();
413    String newRSGroup = request.getNewRsgroupName();
414    LOG.info("{} rename rsgroup from {} to {}", master.getClientIdAuditPrefix(), oldRSGroup,
415      newRSGroup);
416
417    RenameRSGroupResponse.Builder builder = RenameRSGroupResponse.newBuilder();
418    try {
419      if (master.getMasterCoprocessorHost() != null) {
420        master.getMasterCoprocessorHost().preRenameRSGroup(oldRSGroup, newRSGroup);
421      }
422      rsGroupInfoManager.renameRSGroup(oldRSGroup, newRSGroup);
423      if (master.getMasterCoprocessorHost() != null) {
424        master.getMasterCoprocessorHost().postRenameRSGroup(oldRSGroup, newRSGroup);
425      }
426    } catch (IOException e) {
427      CoprocessorRpcUtils.setControllerException(controller, e);
428    }
429    done.run(builder.build());
430  }
431
432}