001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.rsgroup; 019 020import com.google.protobuf.ServiceException; 021import java.io.ByteArrayInputStream; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.NavigableSet; 031import java.util.OptionalLong; 032import java.util.Set; 033import java.util.SortedSet; 034import java.util.TreeSet; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.Coprocessor; 037import org.apache.hadoop.hbase.DoNotRetryIOException; 038import org.apache.hadoop.hbase.NamespaceDescriptor; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder; 044import org.apache.hadoop.hbase.client.Delete; 045import org.apache.hadoop.hbase.client.Get; 046import org.apache.hadoop.hbase.client.Mutation; 047import org.apache.hadoop.hbase.client.Put; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.ResultScanner; 050import org.apache.hadoop.hbase.client.Scan; 051import org.apache.hadoop.hbase.client.Table; 052import org.apache.hadoop.hbase.client.TableDescriptor; 053import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 054import org.apache.hadoop.hbase.constraint.ConstraintException; 055import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 056import org.apache.hadoop.hbase.exceptions.DeserializationException; 057import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 058import org.apache.hadoop.hbase.master.ClusterSchema; 059import org.apache.hadoop.hbase.master.MasterServices; 060import org.apache.hadoop.hbase.master.ServerListener; 061import org.apache.hadoop.hbase.master.TableStateManager; 062import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; 063import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; 064import org.apache.hadoop.hbase.net.Address; 065import org.apache.hadoop.hbase.procedure2.Procedure; 066import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 067import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 068import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; 069import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; 070import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.util.Threads; 073import org.apache.hadoop.hbase.zookeeper.ZKUtil; 074import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 075import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 076import org.apache.hadoop.util.Shell; 077import org.apache.yetus.audience.InterfaceAudience; 078import org.apache.zookeeper.KeeperException; 079import org.slf4j.Logger; 080import org.slf4j.LoggerFactory; 081 082import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 083import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 084import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 085 086/** 087 * This is an implementation of {@link RSGroupInfoManager} which makes use of an HBase table as the 088 * persistence store for the group information. It also makes use of zookeeper to store group 089 * information needed for bootstrapping during offline mode. 090 * <h2>Concurrency</h2> RSGroup state is kept locally in Maps. There is a rsgroup name to cached 091 * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the rsgroup they belong 092 * too (in {@link #tableMap}). These Maps are persisted to the hbase:rsgroup table (and cached in 093 * zk) on each modification. 094 * <p> 095 * Mutations on state are synchronized but reads can continue without having to wait on an instance 096 * monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of 097 * state are read-only, just-in-case (see flushConfig). 098 * <p> 099 * Reads must not block else there is a danger we'll deadlock. 100 * <p> 101 * Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act 102 * on the results of the query modifying cache in zookeeper without another thread making 103 * intermediate modifications. These clients synchronize on the 'this' instance so no other has 104 * access concurrently. Reads must be able to continue concurrently. 105 */ 106@InterfaceAudience.Private 107final class RSGroupInfoManagerImpl implements RSGroupInfoManager { 108 private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class); 109 110 /** Table descriptor for <code>hbase:rsgroup</code> catalog table */ 111 private static final TableDescriptor RSGROUP_TABLE_DESC; 112 static { 113 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_NAME) 114 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(META_FAMILY_BYTES)) 115 .setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName()); 116 try { 117 builder.setCoprocessor( 118 CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName()) 119 .setPriority(Coprocessor.PRIORITY_SYSTEM).build()); 120 } catch (IOException ex) { 121 throw new Error(ex); 122 } 123 RSGROUP_TABLE_DESC = builder.build(); 124 } 125 126 // There two Maps are immutable and wholesale replaced on each modification 127 // so are safe to access concurrently. See class comment. 128 private volatile Map<String, RSGroupInfo> rsGroupMap = Collections.emptyMap(); 129 private volatile Map<TableName, String> tableMap = Collections.emptyMap(); 130 131 private final MasterServices masterServices; 132 private final Connection conn; 133 private final ZKWatcher watcher; 134 private final RSGroupStartupWorker rsGroupStartupWorker; 135 // contains list of groups that were last flushed to persistent store 136 private Set<String> prevRSGroups = new HashSet<>(); 137 private final ServerEventsListenerThread serverEventsListenerThread = 138 new ServerEventsListenerThread(); 139 140 /** Get rsgroup table mapping script */ 141 RSGroupMappingScript script; 142 143 // Package visibility for testing 144 static class RSGroupMappingScript { 145 146 static final String RS_GROUP_MAPPING_SCRIPT = "hbase.rsgroup.table.mapping.script"; 147 static final String RS_GROUP_MAPPING_SCRIPT_TIMEOUT = 148 "hbase.rsgroup.table.mapping.script.timeout"; 149 150 private final String script; 151 private final long scriptTimeout; 152 153 RSGroupMappingScript(Configuration conf) { 154 script = conf.get(RS_GROUP_MAPPING_SCRIPT); 155 scriptTimeout = conf.getLong(RS_GROUP_MAPPING_SCRIPT_TIMEOUT, 5000); // 5 seconds 156 } 157 158 String getRSGroup(String namespace, String tablename) { 159 if (script == null || script.isEmpty()) { 160 return null; 161 } 162 Shell.ShellCommandExecutor rsgroupMappingScript = 163 new Shell.ShellCommandExecutor(new String[] { script, "", "" }, null, null, scriptTimeout); 164 165 String[] exec = rsgroupMappingScript.getExecString(); 166 exec[1] = namespace; 167 exec[2] = tablename; 168 try { 169 rsgroupMappingScript.execute(); 170 } catch (IOException e) { 171 LOG.error("Failed to get RSGroup from script for table {}:{}", namespace, tablename, e); 172 return null; 173 } 174 return rsgroupMappingScript.getOutput().trim(); 175 } 176 } 177 178 private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException { 179 this.masterServices = masterServices; 180 this.watcher = masterServices.getZooKeeper(); 181 this.conn = masterServices.getConnection(); 182 this.rsGroupStartupWorker = new RSGroupStartupWorker(); 183 script = new RSGroupMappingScript(masterServices.getConfiguration()); 184 } 185 186 private synchronized void init() throws IOException { 187 refresh(); 188 serverEventsListenerThread.start(); 189 masterServices.getServerManager().registerListener(serverEventsListenerThread); 190 } 191 192 static RSGroupInfoManager getInstance(MasterServices master) throws IOException { 193 RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(master); 194 instance.init(); 195 return instance; 196 } 197 198 public void start() { 199 // create system table of rsgroup 200 rsGroupStartupWorker.start(); 201 } 202 203 @Override 204 public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException { 205 checkGroupName(rsGroupInfo.getName()); 206 if ( 207 rsGroupMap.get(rsGroupInfo.getName()) != null 208 || rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP) 209 ) { 210 throw new DoNotRetryIOException("Group already exists: " + rsGroupInfo.getName()); 211 } 212 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 213 newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo); 214 flushConfig(newGroupMap); 215 } 216 217 private RSGroupInfo getRSGroupInfo(final String groupName) throws DoNotRetryIOException { 218 RSGroupInfo rsGroupInfo = getRSGroup(groupName); 219 if (rsGroupInfo == null) { 220 throw new DoNotRetryIOException("RSGroup " + groupName + " does not exist"); 221 } 222 return rsGroupInfo; 223 } 224 225 /** 226 * @param master the master to get online servers for 227 * @return Set of online Servers named for their hostname and port (not ServerName). 228 */ 229 private static Set<Address> getOnlineServers(final MasterServices master) { 230 Set<Address> onlineServers = new HashSet<Address>(); 231 if (master == null) { 232 return onlineServers; 233 } 234 235 for (ServerName server : master.getServerManager().getOnlineServers().keySet()) { 236 onlineServers.add(server.getAddress()); 237 } 238 return onlineServers; 239 } 240 241 @Override 242 public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup, 243 String dstGroup) throws IOException { 244 RSGroupInfo src = getRSGroupInfo(srcGroup); 245 RSGroupInfo dst = getRSGroupInfo(dstGroup); 246 Set<Address> movedServers = new HashSet<>(); 247 // If destination is 'default' rsgroup, only add servers that are online. If not online, drop 248 // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a 249 // rsgroup of dead servers that are to come back later). 250 Set<Address> onlineServers = dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) 251 ? getOnlineServers(this.masterServices) 252 : null; 253 for (Address el : servers) { 254 src.removeServer(el); 255 if (onlineServers != null) { 256 if (!onlineServers.contains(el)) { 257 if (LOG.isDebugEnabled()) { 258 LOG.debug("Dropping " + el + " during move-to-default rsgroup because not online"); 259 } 260 continue; 261 } 262 } 263 dst.addServer(el); 264 movedServers.add(el); 265 } 266 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 267 newGroupMap.put(src.getName(), src); 268 newGroupMap.put(dst.getName(), dst); 269 flushConfig(newGroupMap); 270 return movedServers; 271 } 272 273 @Override 274 public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException { 275 for (RSGroupInfo info : rsGroupMap.values()) { 276 if (info.containsServer(serverHostPort)) { 277 return info; 278 } 279 } 280 return null; 281 } 282 283 @Override 284 public RSGroupInfo getRSGroup(String groupName) { 285 return rsGroupMap.get(groupName); 286 } 287 288 @Override 289 public String getRSGroupOfTable(TableName tableName) { 290 return tableMap.get(tableName); 291 } 292 293 @Override 294 public synchronized void moveTables(Set<TableName> tableNames, String groupName) 295 throws IOException { 296 // Check if rsGroup contains the destination rsgroup 297 if (groupName != null && !rsGroupMap.containsKey(groupName)) { 298 throw new DoNotRetryIOException("Group " + groupName + " does not exist"); 299 } 300 301 // Make a copy of rsGroupMap to update 302 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 303 304 // Remove tables from their original rsgroups 305 // and update the copy of rsGroupMap 306 for (TableName tableName : tableNames) { 307 if (tableMap.containsKey(tableName)) { 308 RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName))); 309 src.removeTable(tableName); 310 newGroupMap.put(src.getName(), src); 311 } 312 } 313 314 // Add tables to the destination rsgroup 315 // and update the copy of rsGroupMap 316 if (groupName != null) { 317 RSGroupInfo dstGroup = new RSGroupInfo(newGroupMap.get(groupName)); 318 dstGroup.addAllTables(tableNames); 319 newGroupMap.put(dstGroup.getName(), dstGroup); 320 } 321 322 // Flush according to the updated copy of rsGroupMap 323 flushConfig(newGroupMap); 324 } 325 326 @Override 327 public synchronized void removeRSGroup(String groupName) throws IOException { 328 if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { 329 throw new DoNotRetryIOException( 330 "Group " + groupName + " does not exist or is a reserved " + "group"); 331 } 332 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 333 newGroupMap.remove(groupName); 334 flushConfig(newGroupMap); 335 } 336 337 @Override 338 public List<RSGroupInfo> listRSGroups() { 339 return Lists.newLinkedList(rsGroupMap.values()); 340 } 341 342 @Override 343 public boolean isOnline() { 344 return rsGroupStartupWorker.isOnline(); 345 } 346 347 @Override 348 public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String srcGroup, 349 String dstGroup) throws IOException { 350 // get server's group 351 RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup); 352 RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup); 353 354 // move servers 355 for (Address el : servers) { 356 srcGroupInfo.removeServer(el); 357 dstGroupInfo.addServer(el); 358 } 359 // move tables 360 for (TableName tableName : tables) { 361 srcGroupInfo.removeTable(tableName); 362 dstGroupInfo.addTable(tableName); 363 } 364 365 // flush changed groupinfo 366 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 367 newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo); 368 newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo); 369 flushConfig(newGroupMap); 370 } 371 372 @Override 373 public synchronized void removeServers(Set<Address> servers) throws IOException { 374 Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>(); 375 for (Address el : servers) { 376 RSGroupInfo rsGroupInfo = getRSGroupOfServer(el); 377 if (rsGroupInfo != null) { 378 RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName()); 379 if (newRsGroupInfo == null) { 380 rsGroupInfo.removeServer(el); 381 rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo); 382 } else { 383 newRsGroupInfo.removeServer(el); 384 rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo); 385 } 386 } else { 387 LOG.warn("Server " + el + " does not belong to any rsgroup."); 388 } 389 } 390 391 if (rsGroupInfos.size() > 0) { 392 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 393 newGroupMap.putAll(rsGroupInfos); 394 flushConfig(newGroupMap); 395 } 396 } 397 398 @Override 399 public void renameRSGroup(String oldName, String newName) throws IOException { 400 checkGroupName(oldName); 401 checkGroupName(newName); 402 if (oldName.equals(RSGroupInfo.DEFAULT_GROUP)) { 403 throw new ConstraintException("Can't rename default rsgroup"); 404 } 405 RSGroupInfo oldGroup = getRSGroup(oldName); 406 if (oldGroup == null) { 407 throw new ConstraintException("RSGroup " + oldName + " does not exist"); 408 } 409 if (rsGroupMap.containsKey(newName)) { 410 throw new ConstraintException("Group already exists: " + newName); 411 } 412 413 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 414 newGroupMap.remove(oldName); 415 RSGroupInfo newGroup = 416 new RSGroupInfo(newName, (SortedSet<Address>) oldGroup.getServers(), oldGroup.getTables()); 417 newGroupMap.put(newName, newGroup); 418 flushConfig(newGroupMap); 419 } 420 421 /** 422 * Will try to get the rsgroup from {@code tableMap} first then try to get the rsgroup from 423 * {@code script} try to get the rsgroup from the {@link NamespaceDescriptor} lastly. If still not 424 * present, return default group. 425 */ 426 @Override 427 public RSGroupInfo determineRSGroupInfoForTable(TableName tableName) throws IOException { 428 RSGroupInfo groupFromOldRSGroupInfo = getRSGroup(getRSGroupOfTable(tableName)); 429 if (groupFromOldRSGroupInfo != null) { 430 return groupFromOldRSGroupInfo; 431 } 432 // RSGroup information determined by administrator. 433 RSGroupInfo groupDeterminedByAdmin = getRSGroup( 434 script.getRSGroup(tableName.getNamespaceAsString(), tableName.getQualifierAsString())); 435 if (groupDeterminedByAdmin != null) { 436 return groupDeterminedByAdmin; 437 } 438 // Finally, we will try to fall back to namespace as rsgroup if exists 439 ClusterSchema clusterSchema = masterServices.getClusterSchema(); 440 if (clusterSchema == null) { 441 if (TableName.isMetaTableName(tableName)) { 442 LOG.info("Can not get the namespace rs group config for meta table, since the" 443 + " meta table is not online yet, will use default group to assign meta first"); 444 } else { 445 LOG.warn("ClusterSchema is null, can only use default rsgroup, should not happen?"); 446 } 447 } else { 448 NamespaceDescriptor nd = clusterSchema.getNamespace(tableName.getNamespaceAsString()); 449 RSGroupInfo groupNameOfNs = 450 getRSGroup(nd.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP)); 451 if (groupNameOfNs != null) { 452 return groupNameOfNs; 453 } 454 } 455 return getRSGroup(RSGroupInfo.DEFAULT_GROUP); 456 } 457 458 @Override 459 public void updateRSGroupConfig(String groupName, Map<String, String> configuration) 460 throws IOException { 461 if (RSGroupInfo.DEFAULT_GROUP.equals(groupName)) { 462 // We do not persist anything of default group, therefore, it is not supported to update 463 // default group's configuration which lost once master down. 464 throw new ConstraintException( 465 "configuration of " + RSGroupInfo.DEFAULT_GROUP + " can't be stored persistently"); 466 } 467 RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName); 468 new HashSet<>(rsGroupInfo.getConfiguration().keySet()) 469 .forEach(rsGroupInfo::removeConfiguration); 470 configuration.forEach(rsGroupInfo::setConfiguration); 471 flushConfig(); 472 } 473 474 List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException { 475 List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList(); 476 try (Table table = conn.getTable(RSGROUP_TABLE_NAME); 477 ResultScanner scanner = table.getScanner(new Scan())) { 478 for (Result result;;) { 479 result = scanner.next(); 480 if (result == null) { 481 break; 482 } 483 RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo 484 .parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES)); 485 rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto)); 486 } 487 } 488 return rsGroupInfoList; 489 } 490 491 List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException { 492 String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode); 493 List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList(); 494 // Overwrite any info stored by table, this takes precedence 495 try { 496 if (ZKUtil.checkExists(watcher, groupBasePath) != -1) { 497 List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath); 498 if (children == null) { 499 return RSGroupInfoList; 500 } 501 for (String znode : children) { 502 byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode)); 503 if (data != null && data.length > 0) { 504 ProtobufUtil.expectPBMagicPrefix(data); 505 ByteArrayInputStream bis = 506 new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length); 507 RSGroupInfoList 508 .add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis))); 509 } 510 } 511 LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size()); 512 } 513 } catch (KeeperException | DeserializationException | InterruptedException e) { 514 throw new IOException("Failed to read rsGroupZNode", e); 515 } 516 return RSGroupInfoList; 517 } 518 519 @Override 520 public void refresh() throws IOException { 521 refresh(false); 522 } 523 524 /** 525 * Read rsgroup info from the source of truth, the hbase:rsgroup table. Update zk cache. Called on 526 * startup of the manager. 527 */ 528 private synchronized void refresh(boolean forceOnline) throws IOException { 529 List<RSGroupInfo> groupList = new LinkedList<>(); 530 531 // Overwrite anything read from zk, group table is source of truth 532 // if online read from GROUP table 533 if (forceOnline || isOnline()) { 534 LOG.debug("Refreshing in Online mode."); 535 groupList.addAll(retrieveGroupListFromGroupTable()); 536 } else { 537 LOG.debug("Refreshing in Offline mode."); 538 groupList.addAll(retrieveGroupListFromZookeeper()); 539 } 540 541 // refresh default group, prune 542 NavigableSet<TableName> orphanTables = new TreeSet<>(); 543 for (String entry : masterServices.getTableDescriptors().getAll().keySet()) { 544 orphanTables.add(TableName.valueOf(entry)); 545 } 546 for (RSGroupInfo group : groupList) { 547 if (!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 548 orphanTables.removeAll(group.getTables()); 549 } 550 } 551 552 // This is added to the last of the list so it overwrites the 'default' rsgroup loaded 553 // from region group table or zk 554 groupList 555 .add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(groupList), orphanTables)); 556 557 // populate the data 558 HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(); 559 HashMap<TableName, String> newTableMap = Maps.newHashMap(); 560 for (RSGroupInfo group : groupList) { 561 newGroupMap.put(group.getName(), group); 562 for (TableName table : group.getTables()) { 563 newTableMap.put(table, group.getName()); 564 } 565 } 566 resetRSGroupAndTableMaps(newGroupMap, newTableMap); 567 updateCacheOfRSGroups(rsGroupMap.keySet()); 568 } 569 570 private synchronized Map<TableName, String> flushConfigTable(Map<String, RSGroupInfo> groupMap) 571 throws IOException { 572 Map<TableName, String> newTableMap = Maps.newHashMap(); 573 List<Mutation> mutations = Lists.newArrayList(); 574 575 // populate deletes 576 for (String groupName : prevRSGroups) { 577 if (!groupMap.containsKey(groupName)) { 578 Delete d = new Delete(Bytes.toBytes(groupName)); 579 mutations.add(d); 580 } 581 } 582 583 // populate puts 584 for (RSGroupInfo RSGroupInfo : groupMap.values()) { 585 RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo); 586 Put p = new Put(Bytes.toBytes(RSGroupInfo.getName())); 587 p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray()); 588 mutations.add(p); 589 for (TableName entry : RSGroupInfo.getTables()) { 590 newTableMap.put(entry, RSGroupInfo.getName()); 591 } 592 } 593 594 if (mutations.size() > 0) { 595 multiMutate(mutations); 596 } 597 return newTableMap; 598 } 599 600 private synchronized void flushConfig() throws IOException { 601 flushConfig(this.rsGroupMap); 602 } 603 604 private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException { 605 Map<TableName, String> newTableMap; 606 607 // For offline mode persistence is still unavailable 608 // We're refreshing in-memory state but only for servers in default group 609 if (!isOnline()) { 610 if (newGroupMap == this.rsGroupMap) { 611 // When newGroupMap is this.rsGroupMap itself, 612 // do not need to check default group and other groups as followed 613 return; 614 } 615 616 Map<String, RSGroupInfo> oldGroupMap = Maps.newHashMap(rsGroupMap); 617 RSGroupInfo oldDefaultGroup = oldGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); 618 RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); 619 if ( 620 !oldGroupMap.equals(newGroupMap) 621 /* compare both tables and servers in other groups */ || !oldDefaultGroup.getTables() 622 .equals(newDefaultGroup.getTables()) 623 /* compare tables in default group */ 624 ) { 625 throw new IOException("Only servers in default group can be updated during offline mode"); 626 } 627 628 // Restore newGroupMap by putting its default group back 629 newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup); 630 631 // Refresh rsGroupMap 632 // according to the inputted newGroupMap (an updated copy of rsGroupMap) 633 rsGroupMap = newGroupMap; 634 635 // Do not need to update tableMap 636 // because only the update on servers in default group is allowed above, 637 // or IOException will be thrown 638 return; 639 } 640 641 /* For online mode, persist to Zookeeper */ 642 newTableMap = flushConfigTable(newGroupMap); 643 644 // Make changes visible after having been persisted to the source of truth 645 resetRSGroupAndTableMaps(newGroupMap, newTableMap); 646 647 try { 648 String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode); 649 ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC); 650 651 List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size()); 652 for (String groupName : prevRSGroups) { 653 if (!newGroupMap.containsKey(groupName)) { 654 String znode = ZNodePaths.joinZNode(groupBasePath, groupName); 655 zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); 656 } 657 } 658 659 for (RSGroupInfo RSGroupInfo : newGroupMap.values()) { 660 String znode = ZNodePaths.joinZNode(groupBasePath, RSGroupInfo.getName()); 661 RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo); 662 LOG.debug("Updating znode: " + znode); 663 ZKUtil.createAndFailSilent(watcher, znode); 664 zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); 665 zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode, 666 ProtobufUtil.prependPBMagic(proto.toByteArray()))); 667 } 668 LOG.debug("Writing ZK GroupInfo count: " + zkOps.size()); 669 670 ZKUtil.multiOrSequential(watcher, zkOps, false); 671 } catch (KeeperException e) { 672 LOG.error("Failed to write to rsGroupZNode", e); 673 masterServices.abort("Failed to write to rsGroupZNode", e); 674 throw new IOException("Failed to write to rsGroupZNode", e); 675 } 676 updateCacheOfRSGroups(newGroupMap.keySet()); 677 } 678 679 /** 680 * Make changes visible. Caller must be synchronized on 'this'. 681 */ 682 private void resetRSGroupAndTableMaps(Map<String, RSGroupInfo> newRSGroupMap, 683 Map<TableName, String> newTableMap) { 684 // Make maps Immutable. 685 this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap); 686 this.tableMap = Collections.unmodifiableMap(newTableMap); 687 } 688 689 /** 690 * Update cache of rsgroups. Caller must be synchronized on 'this'. 691 * @param currentGroups Current list of Groups. 692 */ 693 private void updateCacheOfRSGroups(final Set<String> currentGroups) { 694 this.prevRSGroups.clear(); 695 this.prevRSGroups.addAll(currentGroups); 696 } 697 698 // Called by getDefaultServers. Presume it has lock in place. 699 private List<ServerName> getOnlineRS() throws IOException { 700 if (masterServices != null) { 701 return masterServices.getServerManager().getOnlineServersList(); 702 } 703 LOG.debug("Reading online RS from zookeeper"); 704 List<ServerName> servers = new LinkedList<>(); 705 try { 706 for (String el : ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) { 707 servers.add(ServerName.parseServerName(el)); 708 } 709 } catch (KeeperException e) { 710 throw new IOException("Failed to retrieve server list from zookeeper", e); 711 } 712 return servers; 713 } 714 715 // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs. 716 private SortedSet<Address> getDefaultServers() throws IOException { 717 return getDefaultServers(listRSGroups()); 718 } 719 720 // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs. 721 private SortedSet<Address> getDefaultServers(List<RSGroupInfo> rsGroupInfoList) 722 throws IOException { 723 // Build a list of servers in other groups than default group, from rsGroupMap 724 Set<Address> serversInOtherGroup = new HashSet<>(); 725 for (RSGroupInfo group : rsGroupInfoList) { 726 if (!RSGroupInfo.DEFAULT_GROUP.equals(group.getName())) { // not default group 727 serversInOtherGroup.addAll(group.getServers()); 728 } 729 } 730 731 // Get all online servers from Zookeeper and find out servers in default group 732 SortedSet<Address> defaultServers = Sets.newTreeSet(); 733 for (ServerName serverName : getOnlineRS()) { 734 Address server = Address.fromParts(serverName.getHostname(), serverName.getPort()); 735 if (!serversInOtherGroup.contains(server)) { // not in other groups 736 defaultServers.add(server); 737 } 738 } 739 return defaultServers; 740 } 741 742 // Called by ServerEventsListenerThread. Synchronize on this because redoing 743 // the rsGroupMap then writing it out. 744 private synchronized void updateDefaultServers(SortedSet<Address> servers) throws IOException { 745 RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP); 746 RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers, info.getTables()); 747 HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 748 newGroupMap.put(newInfo.getName(), newInfo); 749 flushConfig(newGroupMap); 750 } 751 752 /** 753 * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known 754 * servers. Notifications about server changes are received by registering {@link ServerListener}. 755 * As a listener, we need to return immediately, so the real work of updating the servers is done 756 * asynchronously in this thread. 757 */ 758 private class ServerEventsListenerThread extends Thread implements ServerListener { 759 private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class); 760 private boolean changed = false; 761 762 ServerEventsListenerThread() { 763 setDaemon(true); 764 } 765 766 @Override 767 public void serverAdded(ServerName serverName) { 768 serverChanged(); 769 } 770 771 @Override 772 public void serverRemoved(ServerName serverName) { 773 serverChanged(); 774 } 775 776 private synchronized void serverChanged() { 777 changed = true; 778 this.notify(); 779 } 780 781 @Override 782 public void run() { 783 setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName()); 784 SortedSet<Address> prevDefaultServers = new TreeSet<>(); 785 while (isMasterRunning(masterServices)) { 786 try { 787 LOG.info("Updating default servers."); 788 SortedSet<Address> servers = RSGroupInfoManagerImpl.this.getDefaultServers(); 789 if (!servers.equals(prevDefaultServers)) { 790 RSGroupInfoManagerImpl.this.updateDefaultServers(servers); 791 prevDefaultServers = servers; 792 LOG.info("Updated with servers: " + servers.size()); 793 } 794 try { 795 synchronized (this) { 796 while (!changed) { 797 wait(); 798 } 799 changed = false; 800 } 801 } catch (InterruptedException e) { 802 LOG.warn("Interrupted", e); 803 } 804 } catch (IOException e) { 805 LOG.warn("Failed to update default servers", e); 806 } 807 } 808 } 809 } 810 811 private class RSGroupStartupWorker extends Thread { 812 private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class); 813 private volatile boolean online = false; 814 815 RSGroupStartupWorker() { 816 super(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName()); 817 setDaemon(true); 818 } 819 820 @Override 821 public void run() { 822 if (waitForGroupTableOnline()) { 823 LOG.info("GroupBasedLoadBalancer is now online"); 824 } else { 825 LOG.warn("Quit without making region group table online"); 826 } 827 } 828 829 private boolean waitForGroupTableOnline() { 830 while (isMasterRunning(masterServices)) { 831 try { 832 TableStateManager tsm = masterServices.getTableStateManager(); 833 if (!tsm.isTablePresent(RSGROUP_TABLE_NAME)) { 834 createRSGroupTable(); 835 } 836 // try reading from the table 837 try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) { 838 table.get(new Get(ROW_KEY)); 839 } 840 LOG.info( 841 "RSGroup table=" + RSGROUP_TABLE_NAME + " is online, refreshing cached information"); 842 RSGroupInfoManagerImpl.this.refresh(true); 843 online = true; 844 // flush any inconsistencies between ZK and HTable 845 RSGroupInfoManagerImpl.this.flushConfig(); 846 return true; 847 } catch (Exception e) { 848 LOG.warn("Failed to perform check", e); 849 // 100ms is short so let's just ignore the interrupt 850 Threads.sleepWithoutInterrupt(100); 851 } 852 } 853 return false; 854 } 855 856 private void createRSGroupTable() throws IOException { 857 OptionalLong optProcId = masterServices.getProcedures().stream() 858 .filter(p -> p instanceof CreateTableProcedure).map(p -> (CreateTableProcedure) p) 859 .filter(p -> p.getTableName().equals(RSGROUP_TABLE_NAME)).mapToLong(Procedure::getProcId) 860 .findFirst(); 861 long procId; 862 if (optProcId.isPresent()) { 863 procId = optProcId.getAsLong(); 864 } else { 865 procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC); 866 } 867 // wait for region to be online 868 int tries = 600; 869 while ( 870 !(masterServices.getMasterProcedureExecutor().isFinished(procId)) 871 && masterServices.getMasterProcedureExecutor().isRunning() && tries > 0 872 ) { 873 try { 874 Thread.sleep(100); 875 } catch (InterruptedException e) { 876 throw new IOException("Wait interrupted ", e); 877 } 878 tries--; 879 } 880 if (tries <= 0) { 881 throw new IOException("Failed to create group table in a given time."); 882 } else { 883 Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId); 884 if (result != null && result.isFailed()) { 885 throw new IOException( 886 "Failed to create group table. " + MasterProcedureUtil.unwrapRemoteIOException(result)); 887 } 888 } 889 } 890 891 public boolean isOnline() { 892 return online; 893 } 894 } 895 896 private static boolean isMasterRunning(MasterServices masterServices) { 897 return !masterServices.isAborted() && !masterServices.isStopped(); 898 } 899 900 private void multiMutate(List<Mutation> mutations) throws IOException { 901 try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) { 902 CoprocessorRpcChannel channel = table.coprocessorService(ROW_KEY); 903 MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder = 904 MultiRowMutationProtos.MutateRowsRequest.newBuilder(); 905 for (Mutation mutation : mutations) { 906 if (mutation instanceof Put) { 907 mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 908 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, 909 mutation)); 910 } else if (mutation instanceof Delete) { 911 mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 912 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.DELETE, 913 mutation)); 914 } else { 915 throw new DoNotRetryIOException( 916 "multiMutate doesn't support " + mutation.getClass().getName()); 917 } 918 } 919 920 MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service = 921 MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel); 922 try { 923 service.mutateRows(null, mmrBuilder.build()); 924 } catch (ServiceException ex) { 925 ProtobufUtil.toIOException(ex); 926 } 927 } 928 } 929 930 private void checkGroupName(String groupName) throws ConstraintException { 931 if (!groupName.matches("[a-zA-Z0-9_]+")) { 932 throw new ConstraintException("RSGroup name should only contain alphanumeric characters"); 933 } 934 } 935}