001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rsgroup;
019
020import com.google.protobuf.ServiceException;
021import java.io.ByteArrayInputStream;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.NavigableSet;
031import java.util.OptionalLong;
032import java.util.Set;
033import java.util.SortedSet;
034import java.util.TreeSet;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.Coprocessor;
037import org.apache.hadoop.hbase.DoNotRetryIOException;
038import org.apache.hadoop.hbase.NamespaceDescriptor;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
044import org.apache.hadoop.hbase.client.Delete;
045import org.apache.hadoop.hbase.client.Get;
046import org.apache.hadoop.hbase.client.Mutation;
047import org.apache.hadoop.hbase.client.Put;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.ResultScanner;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.client.Table;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
054import org.apache.hadoop.hbase.constraint.ConstraintException;
055import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
056import org.apache.hadoop.hbase.exceptions.DeserializationException;
057import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
058import org.apache.hadoop.hbase.master.ClusterSchema;
059import org.apache.hadoop.hbase.master.MasterServices;
060import org.apache.hadoop.hbase.master.ServerListener;
061import org.apache.hadoop.hbase.master.TableStateManager;
062import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
063import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
064import org.apache.hadoop.hbase.net.Address;
065import org.apache.hadoop.hbase.procedure2.Procedure;
066import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
067import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
068import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
069import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
070import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.util.Threads;
073import org.apache.hadoop.hbase.zookeeper.ZKUtil;
074import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
075import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
076import org.apache.hadoop.util.Shell;
077import org.apache.yetus.audience.InterfaceAudience;
078import org.apache.zookeeper.KeeperException;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
083import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
084import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
085
086/**
087 * This is an implementation of {@link RSGroupInfoManager} which makes use of an HBase table as the
088 * persistence store for the group information. It also makes use of zookeeper to store group
089 * information needed for bootstrapping during offline mode.
090 * <h2>Concurrency</h2> RSGroup state is kept locally in Maps. There is a rsgroup name to cached
091 * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the rsgroup they belong
092 * too (in {@link #tableMap}). These Maps are persisted to the hbase:rsgroup table (and cached in
093 * zk) on each modification.
094 * <p>
095 * Mutations on state are synchronized but reads can continue without having to wait on an instance
096 * monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of
097 * state are read-only, just-in-case (see flushConfig).
098 * <p>
099 * Reads must not block else there is a danger we'll deadlock.
100 * <p>
101 * Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act
102 * on the results of the query modifying cache in zookeeper without another thread making
103 * intermediate modifications. These clients synchronize on the 'this' instance so no other has
104 * access concurrently. Reads must be able to continue concurrently.
105 */
106@InterfaceAudience.Private
107final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
108  private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class);
109
110  /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
111  private static final TableDescriptor RSGROUP_TABLE_DESC;
112  static {
113    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_NAME)
114      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(META_FAMILY_BYTES))
115      .setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
116    try {
117      builder.setCoprocessor(
118        CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName())
119          .setPriority(Coprocessor.PRIORITY_SYSTEM).build());
120    } catch (IOException ex) {
121      throw new Error(ex);
122    }
123    RSGROUP_TABLE_DESC = builder.build();
124  }
125
126  // There two Maps are immutable and wholesale replaced on each modification
127  // so are safe to access concurrently. See class comment.
128  private volatile Map<String, RSGroupInfo> rsGroupMap = Collections.emptyMap();
129  private volatile Map<TableName, String> tableMap = Collections.emptyMap();
130
131  private final MasterServices masterServices;
132  private final Connection conn;
133  private final ZKWatcher watcher;
134  private final RSGroupStartupWorker rsGroupStartupWorker;
135  // contains list of groups that were last flushed to persistent store
136  private Set<String> prevRSGroups = new HashSet<>();
137  private final ServerEventsListenerThread serverEventsListenerThread =
138    new ServerEventsListenerThread();
139
140  /** Get rsgroup table mapping script */
141  RSGroupMappingScript script;
142
143  // Package visibility for testing
144  static class RSGroupMappingScript {
145
146    static final String RS_GROUP_MAPPING_SCRIPT = "hbase.rsgroup.table.mapping.script";
147    static final String RS_GROUP_MAPPING_SCRIPT_TIMEOUT =
148      "hbase.rsgroup.table.mapping.script.timeout";
149
150    private final String script;
151    private final long scriptTimeout;
152
153    RSGroupMappingScript(Configuration conf) {
154      script = conf.get(RS_GROUP_MAPPING_SCRIPT);
155      scriptTimeout = conf.getLong(RS_GROUP_MAPPING_SCRIPT_TIMEOUT, 5000); // 5 seconds
156    }
157
158    String getRSGroup(String namespace, String tablename) {
159      if (script == null || script.isEmpty()) {
160        return null;
161      }
162      Shell.ShellCommandExecutor rsgroupMappingScript =
163        new Shell.ShellCommandExecutor(new String[] { script, "", "" }, null, null, scriptTimeout);
164
165      String[] exec = rsgroupMappingScript.getExecString();
166      exec[1] = namespace;
167      exec[2] = tablename;
168      try {
169        rsgroupMappingScript.execute();
170      } catch (IOException e) {
171        LOG.error("Failed to get RSGroup from script for table {}:{}", namespace, tablename, e);
172        return null;
173      }
174      return rsgroupMappingScript.getOutput().trim();
175    }
176  }
177
178  private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
179    this.masterServices = masterServices;
180    this.watcher = masterServices.getZooKeeper();
181    this.conn = masterServices.getConnection();
182    this.rsGroupStartupWorker = new RSGroupStartupWorker();
183    script = new RSGroupMappingScript(masterServices.getConfiguration());
184  }
185
186  private synchronized void init() throws IOException {
187    refresh();
188    serverEventsListenerThread.start();
189    masterServices.getServerManager().registerListener(serverEventsListenerThread);
190  }
191
192  static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
193    RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(master);
194    instance.init();
195    return instance;
196  }
197
198  public void start() {
199    // create system table of rsgroup
200    rsGroupStartupWorker.start();
201  }
202
203  @Override
204  public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
205    checkGroupName(rsGroupInfo.getName());
206    if (
207      rsGroupMap.get(rsGroupInfo.getName()) != null
208        || rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)
209    ) {
210      throw new DoNotRetryIOException("Group already exists: " + rsGroupInfo.getName());
211    }
212    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
213    newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo);
214    flushConfig(newGroupMap);
215  }
216
217  private RSGroupInfo getRSGroupInfo(final String groupName) throws DoNotRetryIOException {
218    RSGroupInfo rsGroupInfo = getRSGroup(groupName);
219    if (rsGroupInfo == null) {
220      throw new DoNotRetryIOException("RSGroup " + groupName + " does not exist");
221    }
222    return rsGroupInfo;
223  }
224
225  /**
226   * @param master the master to get online servers for
227   * @return Set of online Servers named for their hostname and port (not ServerName).
228   */
229  private static Set<Address> getOnlineServers(final MasterServices master) {
230    Set<Address> onlineServers = new HashSet<Address>();
231    if (master == null) {
232      return onlineServers;
233    }
234
235    for (ServerName server : master.getServerManager().getOnlineServers().keySet()) {
236      onlineServers.add(server.getAddress());
237    }
238    return onlineServers;
239  }
240
241  @Override
242  public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup,
243    String dstGroup) throws IOException {
244    RSGroupInfo src = getRSGroupInfo(srcGroup);
245    RSGroupInfo dst = getRSGroupInfo(dstGroup);
246    Set<Address> movedServers = new HashSet<>();
247    // If destination is 'default' rsgroup, only add servers that are online. If not online, drop
248    // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a
249    // rsgroup of dead servers that are to come back later).
250    Set<Address> onlineServers = dst.getName().equals(RSGroupInfo.DEFAULT_GROUP)
251      ? getOnlineServers(this.masterServices)
252      : null;
253    for (Address el : servers) {
254      src.removeServer(el);
255      if (onlineServers != null) {
256        if (!onlineServers.contains(el)) {
257          if (LOG.isDebugEnabled()) {
258            LOG.debug("Dropping " + el + " during move-to-default rsgroup because not online");
259          }
260          continue;
261        }
262      }
263      dst.addServer(el);
264      movedServers.add(el);
265    }
266    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
267    newGroupMap.put(src.getName(), src);
268    newGroupMap.put(dst.getName(), dst);
269    flushConfig(newGroupMap);
270    return movedServers;
271  }
272
273  @Override
274  public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException {
275    for (RSGroupInfo info : rsGroupMap.values()) {
276      if (info.containsServer(serverHostPort)) {
277        return info;
278      }
279    }
280    return null;
281  }
282
283  @Override
284  public RSGroupInfo getRSGroup(String groupName) {
285    return rsGroupMap.get(groupName);
286  }
287
288  @Override
289  public String getRSGroupOfTable(TableName tableName) {
290    return tableMap.get(tableName);
291  }
292
293  @Override
294  public synchronized void moveTables(Set<TableName> tableNames, String groupName)
295    throws IOException {
296    // Check if rsGroup contains the destination rsgroup
297    if (groupName != null && !rsGroupMap.containsKey(groupName)) {
298      throw new DoNotRetryIOException("Group " + groupName + " does not exist");
299    }
300
301    // Make a copy of rsGroupMap to update
302    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
303
304    // Remove tables from their original rsgroups
305    // and update the copy of rsGroupMap
306    for (TableName tableName : tableNames) {
307      if (tableMap.containsKey(tableName)) {
308        RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName)));
309        src.removeTable(tableName);
310        newGroupMap.put(src.getName(), src);
311      }
312    }
313
314    // Add tables to the destination rsgroup
315    // and update the copy of rsGroupMap
316    if (groupName != null) {
317      RSGroupInfo dstGroup = new RSGroupInfo(newGroupMap.get(groupName));
318      dstGroup.addAllTables(tableNames);
319      newGroupMap.put(dstGroup.getName(), dstGroup);
320    }
321
322    // Flush according to the updated copy of rsGroupMap
323    flushConfig(newGroupMap);
324  }
325
326  @Override
327  public synchronized void removeRSGroup(String groupName) throws IOException {
328    if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
329      throw new DoNotRetryIOException(
330        "Group " + groupName + " does not exist or is a reserved " + "group");
331    }
332    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
333    newGroupMap.remove(groupName);
334    flushConfig(newGroupMap);
335  }
336
337  @Override
338  public List<RSGroupInfo> listRSGroups() {
339    return Lists.newLinkedList(rsGroupMap.values());
340  }
341
342  @Override
343  public boolean isOnline() {
344    return rsGroupStartupWorker.isOnline();
345  }
346
347  @Override
348  public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String srcGroup,
349    String dstGroup) throws IOException {
350    // get server's group
351    RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup);
352    RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup);
353
354    // move servers
355    for (Address el : servers) {
356      srcGroupInfo.removeServer(el);
357      dstGroupInfo.addServer(el);
358    }
359    // move tables
360    for (TableName tableName : tables) {
361      srcGroupInfo.removeTable(tableName);
362      dstGroupInfo.addTable(tableName);
363    }
364
365    // flush changed groupinfo
366    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
367    newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo);
368    newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo);
369    flushConfig(newGroupMap);
370  }
371
372  @Override
373  public synchronized void removeServers(Set<Address> servers) throws IOException {
374    Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>();
375    for (Address el : servers) {
376      RSGroupInfo rsGroupInfo = getRSGroupOfServer(el);
377      if (rsGroupInfo != null) {
378        RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName());
379        if (newRsGroupInfo == null) {
380          rsGroupInfo.removeServer(el);
381          rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo);
382        } else {
383          newRsGroupInfo.removeServer(el);
384          rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo);
385        }
386      } else {
387        LOG.warn("Server " + el + " does not belong to any rsgroup.");
388      }
389    }
390
391    if (rsGroupInfos.size() > 0) {
392      Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
393      newGroupMap.putAll(rsGroupInfos);
394      flushConfig(newGroupMap);
395    }
396  }
397
398  @Override
399  public void renameRSGroup(String oldName, String newName) throws IOException {
400    checkGroupName(oldName);
401    checkGroupName(newName);
402    if (oldName.equals(RSGroupInfo.DEFAULT_GROUP)) {
403      throw new ConstraintException("Can't rename default rsgroup");
404    }
405    RSGroupInfo oldGroup = getRSGroup(oldName);
406    if (oldGroup == null) {
407      throw new ConstraintException("RSGroup " + oldName + " does not exist");
408    }
409    if (rsGroupMap.containsKey(newName)) {
410      throw new ConstraintException("Group already exists: " + newName);
411    }
412
413    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
414    newGroupMap.remove(oldName);
415    RSGroupInfo newGroup =
416      new RSGroupInfo(newName, (SortedSet<Address>) oldGroup.getServers(), oldGroup.getTables());
417    newGroupMap.put(newName, newGroup);
418    flushConfig(newGroupMap);
419  }
420
421  /**
422   * Will try to get the rsgroup from {@code tableMap} first then try to get the rsgroup from
423   * {@code script} try to get the rsgroup from the {@link NamespaceDescriptor} lastly. If still not
424   * present, return default group.
425   */
426  @Override
427  public RSGroupInfo determineRSGroupInfoForTable(TableName tableName) throws IOException {
428    RSGroupInfo groupFromOldRSGroupInfo = getRSGroup(getRSGroupOfTable(tableName));
429    if (groupFromOldRSGroupInfo != null) {
430      return groupFromOldRSGroupInfo;
431    }
432    // RSGroup information determined by administrator.
433    RSGroupInfo groupDeterminedByAdmin = getRSGroup(
434      script.getRSGroup(tableName.getNamespaceAsString(), tableName.getQualifierAsString()));
435    if (groupDeterminedByAdmin != null) {
436      return groupDeterminedByAdmin;
437    }
438    // Finally, we will try to fall back to namespace as rsgroup if exists
439    ClusterSchema clusterSchema = masterServices.getClusterSchema();
440    if (clusterSchema == null) {
441      if (TableName.isMetaTableName(tableName)) {
442        LOG.info("Can not get the namespace rs group config for meta table, since the"
443          + " meta table is not online yet, will use default group to assign meta first");
444      } else {
445        LOG.warn("ClusterSchema is null, can only use default rsgroup, should not happen?");
446      }
447    } else {
448      NamespaceDescriptor nd = clusterSchema.getNamespace(tableName.getNamespaceAsString());
449      RSGroupInfo groupNameOfNs =
450        getRSGroup(nd.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP));
451      if (groupNameOfNs != null) {
452        return groupNameOfNs;
453      }
454    }
455    return getRSGroup(RSGroupInfo.DEFAULT_GROUP);
456  }
457
458  @Override
459  public void updateRSGroupConfig(String groupName, Map<String, String> configuration)
460    throws IOException {
461    if (RSGroupInfo.DEFAULT_GROUP.equals(groupName)) {
462      // We do not persist anything of default group, therefore, it is not supported to update
463      // default group's configuration which lost once master down.
464      throw new ConstraintException(
465        "configuration of " + RSGroupInfo.DEFAULT_GROUP + " can't be stored persistently");
466    }
467    RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName);
468    new HashSet<>(rsGroupInfo.getConfiguration().keySet())
469      .forEach(rsGroupInfo::removeConfiguration);
470    configuration.forEach(rsGroupInfo::setConfiguration);
471    flushConfig();
472  }
473
474  List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException {
475    List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList();
476    try (Table table = conn.getTable(RSGROUP_TABLE_NAME);
477      ResultScanner scanner = table.getScanner(new Scan())) {
478      for (Result result;;) {
479        result = scanner.next();
480        if (result == null) {
481          break;
482        }
483        RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo
484          .parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
485        rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto));
486      }
487    }
488    return rsGroupInfoList;
489  }
490
491  List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
492    String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
493    List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
494    // Overwrite any info stored by table, this takes precedence
495    try {
496      if (ZKUtil.checkExists(watcher, groupBasePath) != -1) {
497        List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath);
498        if (children == null) {
499          return RSGroupInfoList;
500        }
501        for (String znode : children) {
502          byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode));
503          if (data != null && data.length > 0) {
504            ProtobufUtil.expectPBMagicPrefix(data);
505            ByteArrayInputStream bis =
506              new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length);
507            RSGroupInfoList
508              .add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
509          }
510        }
511        LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size());
512      }
513    } catch (KeeperException | DeserializationException | InterruptedException e) {
514      throw new IOException("Failed to read rsGroupZNode", e);
515    }
516    return RSGroupInfoList;
517  }
518
519  @Override
520  public void refresh() throws IOException {
521    refresh(false);
522  }
523
524  /**
525   * Read rsgroup info from the source of truth, the hbase:rsgroup table. Update zk cache. Called on
526   * startup of the manager.
527   */
528  private synchronized void refresh(boolean forceOnline) throws IOException {
529    List<RSGroupInfo> groupList = new LinkedList<>();
530
531    // Overwrite anything read from zk, group table is source of truth
532    // if online read from GROUP table
533    if (forceOnline || isOnline()) {
534      LOG.debug("Refreshing in Online mode.");
535      groupList.addAll(retrieveGroupListFromGroupTable());
536    } else {
537      LOG.debug("Refreshing in Offline mode.");
538      groupList.addAll(retrieveGroupListFromZookeeper());
539    }
540
541    // refresh default group, prune
542    NavigableSet<TableName> orphanTables = new TreeSet<>();
543    for (String entry : masterServices.getTableDescriptors().getAll().keySet()) {
544      orphanTables.add(TableName.valueOf(entry));
545    }
546    for (RSGroupInfo group : groupList) {
547      if (!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
548        orphanTables.removeAll(group.getTables());
549      }
550    }
551
552    // This is added to the last of the list so it overwrites the 'default' rsgroup loaded
553    // from region group table or zk
554    groupList
555      .add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(groupList), orphanTables));
556
557    // populate the data
558    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap();
559    HashMap<TableName, String> newTableMap = Maps.newHashMap();
560    for (RSGroupInfo group : groupList) {
561      newGroupMap.put(group.getName(), group);
562      for (TableName table : group.getTables()) {
563        newTableMap.put(table, group.getName());
564      }
565    }
566    resetRSGroupAndTableMaps(newGroupMap, newTableMap);
567    updateCacheOfRSGroups(rsGroupMap.keySet());
568  }
569
570  private synchronized Map<TableName, String> flushConfigTable(Map<String, RSGroupInfo> groupMap)
571    throws IOException {
572    Map<TableName, String> newTableMap = Maps.newHashMap();
573    List<Mutation> mutations = Lists.newArrayList();
574
575    // populate deletes
576    for (String groupName : prevRSGroups) {
577      if (!groupMap.containsKey(groupName)) {
578        Delete d = new Delete(Bytes.toBytes(groupName));
579        mutations.add(d);
580      }
581    }
582
583    // populate puts
584    for (RSGroupInfo RSGroupInfo : groupMap.values()) {
585      RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
586      Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
587      p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
588      mutations.add(p);
589      for (TableName entry : RSGroupInfo.getTables()) {
590        newTableMap.put(entry, RSGroupInfo.getName());
591      }
592    }
593
594    if (mutations.size() > 0) {
595      multiMutate(mutations);
596    }
597    return newTableMap;
598  }
599
600  private synchronized void flushConfig() throws IOException {
601    flushConfig(this.rsGroupMap);
602  }
603
604  private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException {
605    Map<TableName, String> newTableMap;
606
607    // For offline mode persistence is still unavailable
608    // We're refreshing in-memory state but only for servers in default group
609    if (!isOnline()) {
610      if (newGroupMap == this.rsGroupMap) {
611        // When newGroupMap is this.rsGroupMap itself,
612        // do not need to check default group and other groups as followed
613        return;
614      }
615
616      Map<String, RSGroupInfo> oldGroupMap = Maps.newHashMap(rsGroupMap);
617      RSGroupInfo oldDefaultGroup = oldGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
618      RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
619      if (
620        !oldGroupMap.equals(newGroupMap)
621          /* compare both tables and servers in other groups */ || !oldDefaultGroup.getTables()
622            .equals(newDefaultGroup.getTables())
623        /* compare tables in default group */
624      ) {
625        throw new IOException("Only servers in default group can be updated during offline mode");
626      }
627
628      // Restore newGroupMap by putting its default group back
629      newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup);
630
631      // Refresh rsGroupMap
632      // according to the inputted newGroupMap (an updated copy of rsGroupMap)
633      rsGroupMap = newGroupMap;
634
635      // Do not need to update tableMap
636      // because only the update on servers in default group is allowed above,
637      // or IOException will be thrown
638      return;
639    }
640
641    /* For online mode, persist to Zookeeper */
642    newTableMap = flushConfigTable(newGroupMap);
643
644    // Make changes visible after having been persisted to the source of truth
645    resetRSGroupAndTableMaps(newGroupMap, newTableMap);
646
647    try {
648      String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
649      ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC);
650
651      List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size());
652      for (String groupName : prevRSGroups) {
653        if (!newGroupMap.containsKey(groupName)) {
654          String znode = ZNodePaths.joinZNode(groupBasePath, groupName);
655          zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
656        }
657      }
658
659      for (RSGroupInfo RSGroupInfo : newGroupMap.values()) {
660        String znode = ZNodePaths.joinZNode(groupBasePath, RSGroupInfo.getName());
661        RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
662        LOG.debug("Updating znode: " + znode);
663        ZKUtil.createAndFailSilent(watcher, znode);
664        zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
665        zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
666          ProtobufUtil.prependPBMagic(proto.toByteArray())));
667      }
668      LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
669
670      ZKUtil.multiOrSequential(watcher, zkOps, false);
671    } catch (KeeperException e) {
672      LOG.error("Failed to write to rsGroupZNode", e);
673      masterServices.abort("Failed to write to rsGroupZNode", e);
674      throw new IOException("Failed to write to rsGroupZNode", e);
675    }
676    updateCacheOfRSGroups(newGroupMap.keySet());
677  }
678
679  /**
680   * Make changes visible. Caller must be synchronized on 'this'.
681   */
682  private void resetRSGroupAndTableMaps(Map<String, RSGroupInfo> newRSGroupMap,
683    Map<TableName, String> newTableMap) {
684    // Make maps Immutable.
685    this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap);
686    this.tableMap = Collections.unmodifiableMap(newTableMap);
687  }
688
689  /**
690   * Update cache of rsgroups. Caller must be synchronized on 'this'.
691   * @param currentGroups Current list of Groups.
692   */
693  private void updateCacheOfRSGroups(final Set<String> currentGroups) {
694    this.prevRSGroups.clear();
695    this.prevRSGroups.addAll(currentGroups);
696  }
697
698  // Called by getDefaultServers. Presume it has lock in place.
699  private List<ServerName> getOnlineRS() throws IOException {
700    if (masterServices != null) {
701      return masterServices.getServerManager().getOnlineServersList();
702    }
703    LOG.debug("Reading online RS from zookeeper");
704    List<ServerName> servers = new LinkedList<>();
705    try {
706      for (String el : ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) {
707        servers.add(ServerName.parseServerName(el));
708      }
709    } catch (KeeperException e) {
710      throw new IOException("Failed to retrieve server list from zookeeper", e);
711    }
712    return servers;
713  }
714
715  // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
716  private SortedSet<Address> getDefaultServers() throws IOException {
717    return getDefaultServers(listRSGroups());
718  }
719
720  // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
721  private SortedSet<Address> getDefaultServers(List<RSGroupInfo> rsGroupInfoList)
722    throws IOException {
723    // Build a list of servers in other groups than default group, from rsGroupMap
724    Set<Address> serversInOtherGroup = new HashSet<>();
725    for (RSGroupInfo group : rsGroupInfoList) {
726      if (!RSGroupInfo.DEFAULT_GROUP.equals(group.getName())) { // not default group
727        serversInOtherGroup.addAll(group.getServers());
728      }
729    }
730
731    // Get all online servers from Zookeeper and find out servers in default group
732    SortedSet<Address> defaultServers = Sets.newTreeSet();
733    for (ServerName serverName : getOnlineRS()) {
734      Address server = Address.fromParts(serverName.getHostname(), serverName.getPort());
735      if (!serversInOtherGroup.contains(server)) { // not in other groups
736        defaultServers.add(server);
737      }
738    }
739    return defaultServers;
740  }
741
742  // Called by ServerEventsListenerThread. Synchronize on this because redoing
743  // the rsGroupMap then writing it out.
744  private synchronized void updateDefaultServers(SortedSet<Address> servers) throws IOException {
745    RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
746    RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers, info.getTables());
747    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
748    newGroupMap.put(newInfo.getName(), newInfo);
749    flushConfig(newGroupMap);
750  }
751
752  /**
753   * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
754   * servers. Notifications about server changes are received by registering {@link ServerListener}.
755   * As a listener, we need to return immediately, so the real work of updating the servers is done
756   * asynchronously in this thread.
757   */
758  private class ServerEventsListenerThread extends Thread implements ServerListener {
759    private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class);
760    private boolean changed = false;
761
762    ServerEventsListenerThread() {
763      setDaemon(true);
764    }
765
766    @Override
767    public void serverAdded(ServerName serverName) {
768      serverChanged();
769    }
770
771    @Override
772    public void serverRemoved(ServerName serverName) {
773      serverChanged();
774    }
775
776    private synchronized void serverChanged() {
777      changed = true;
778      this.notify();
779    }
780
781    @Override
782    public void run() {
783      setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName());
784      SortedSet<Address> prevDefaultServers = new TreeSet<>();
785      while (isMasterRunning(masterServices)) {
786        try {
787          LOG.info("Updating default servers.");
788          SortedSet<Address> servers = RSGroupInfoManagerImpl.this.getDefaultServers();
789          if (!servers.equals(prevDefaultServers)) {
790            RSGroupInfoManagerImpl.this.updateDefaultServers(servers);
791            prevDefaultServers = servers;
792            LOG.info("Updated with servers: " + servers.size());
793          }
794          try {
795            synchronized (this) {
796              while (!changed) {
797                wait();
798              }
799              changed = false;
800            }
801          } catch (InterruptedException e) {
802            LOG.warn("Interrupted", e);
803          }
804        } catch (IOException e) {
805          LOG.warn("Failed to update default servers", e);
806        }
807      }
808    }
809  }
810
811  private class RSGroupStartupWorker extends Thread {
812    private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class);
813    private volatile boolean online = false;
814
815    RSGroupStartupWorker() {
816      super(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName());
817      setDaemon(true);
818    }
819
820    @Override
821    public void run() {
822      if (waitForGroupTableOnline()) {
823        LOG.info("GroupBasedLoadBalancer is now online");
824      } else {
825        LOG.warn("Quit without making region group table online");
826      }
827    }
828
829    private boolean waitForGroupTableOnline() {
830      while (isMasterRunning(masterServices)) {
831        try {
832          TableStateManager tsm = masterServices.getTableStateManager();
833          if (!tsm.isTablePresent(RSGROUP_TABLE_NAME)) {
834            createRSGroupTable();
835          }
836          // try reading from the table
837          try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
838            table.get(new Get(ROW_KEY));
839          }
840          LOG.info(
841            "RSGroup table=" + RSGROUP_TABLE_NAME + " is online, refreshing cached information");
842          RSGroupInfoManagerImpl.this.refresh(true);
843          online = true;
844          // flush any inconsistencies between ZK and HTable
845          RSGroupInfoManagerImpl.this.flushConfig();
846          return true;
847        } catch (Exception e) {
848          LOG.warn("Failed to perform check", e);
849          // 100ms is short so let's just ignore the interrupt
850          Threads.sleepWithoutInterrupt(100);
851        }
852      }
853      return false;
854    }
855
856    private void createRSGroupTable() throws IOException {
857      OptionalLong optProcId = masterServices.getProcedures().stream()
858        .filter(p -> p instanceof CreateTableProcedure).map(p -> (CreateTableProcedure) p)
859        .filter(p -> p.getTableName().equals(RSGROUP_TABLE_NAME)).mapToLong(Procedure::getProcId)
860        .findFirst();
861      long procId;
862      if (optProcId.isPresent()) {
863        procId = optProcId.getAsLong();
864      } else {
865        procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC);
866      }
867      // wait for region to be online
868      int tries = 600;
869      while (
870        !(masterServices.getMasterProcedureExecutor().isFinished(procId))
871          && masterServices.getMasterProcedureExecutor().isRunning() && tries > 0
872      ) {
873        try {
874          Thread.sleep(100);
875        } catch (InterruptedException e) {
876          throw new IOException("Wait interrupted ", e);
877        }
878        tries--;
879      }
880      if (tries <= 0) {
881        throw new IOException("Failed to create group table in a given time.");
882      } else {
883        Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId);
884        if (result != null && result.isFailed()) {
885          throw new IOException(
886            "Failed to create group table. " + MasterProcedureUtil.unwrapRemoteIOException(result));
887        }
888      }
889    }
890
891    public boolean isOnline() {
892      return online;
893    }
894  }
895
896  private static boolean isMasterRunning(MasterServices masterServices) {
897    return !masterServices.isAborted() && !masterServices.isStopped();
898  }
899
900  private void multiMutate(List<Mutation> mutations) throws IOException {
901    try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
902      CoprocessorRpcChannel channel = table.coprocessorService(ROW_KEY);
903      MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder =
904        MultiRowMutationProtos.MutateRowsRequest.newBuilder();
905      for (Mutation mutation : mutations) {
906        if (mutation instanceof Put) {
907          mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
908            org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT,
909            mutation));
910        } else if (mutation instanceof Delete) {
911          mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
912            org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.DELETE,
913            mutation));
914        } else {
915          throw new DoNotRetryIOException(
916            "multiMutate doesn't support " + mutation.getClass().getName());
917        }
918      }
919
920      MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
921        MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
922      try {
923        service.mutateRows(null, mmrBuilder.build());
924      } catch (ServiceException ex) {
925        ProtobufUtil.toIOException(ex);
926      }
927    }
928  }
929
930  private void checkGroupName(String groupName) throws ConstraintException {
931    if (!groupName.matches("[a-zA-Z0-9_]+")) {
932      throw new ConstraintException("RSGroup name should only contain alphanumeric characters");
933    }
934  }
935}