001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT; 021import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT; 022import static org.apache.hadoop.hbase.util.Addressing.inetSocketAddress2String; 023 024import java.io.Closeable; 025import java.io.IOException; 026import java.net.BindException; 027import java.net.InetSocketAddress; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.EnumSet; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.TreeSet; 039import java.util.concurrent.Callable; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentMap; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorService; 044import java.util.concurrent.Future; 045import java.util.concurrent.ScheduledThreadPoolExecutor; 046import java.util.concurrent.ThreadLocalRandom; 047import java.util.concurrent.atomic.AtomicLong; 048import java.util.concurrent.atomic.LongAdder; 049import java.util.regex.Matcher; 050import java.util.regex.Pattern; 051import org.apache.commons.lang3.time.StopWatch; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.hbase.AuthUtil; 054import org.apache.hadoop.hbase.ChoreService; 055import org.apache.hadoop.hbase.ClusterMetrics; 056import org.apache.hadoop.hbase.ClusterMetrics.Option; 057import org.apache.hadoop.hbase.DoNotRetryIOException; 058import org.apache.hadoop.hbase.HBaseConfiguration; 059import org.apache.hadoop.hbase.HBaseInterfaceAudience; 060import org.apache.hadoop.hbase.HColumnDescriptor; 061import org.apache.hadoop.hbase.HConstants; 062import org.apache.hadoop.hbase.HRegionLocation; 063import org.apache.hadoop.hbase.HTableDescriptor; 064import org.apache.hadoop.hbase.MetaTableAccessor; 065import org.apache.hadoop.hbase.NamespaceDescriptor; 066import org.apache.hadoop.hbase.ScheduledChore; 067import org.apache.hadoop.hbase.ServerName; 068import org.apache.hadoop.hbase.TableName; 069import org.apache.hadoop.hbase.TableNotEnabledException; 070import org.apache.hadoop.hbase.TableNotFoundException; 071import org.apache.hadoop.hbase.client.Admin; 072import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 073import org.apache.hadoop.hbase.client.Connection; 074import org.apache.hadoop.hbase.client.ConnectionFactory; 075import org.apache.hadoop.hbase.client.Get; 076import org.apache.hadoop.hbase.client.Put; 077import org.apache.hadoop.hbase.client.RegionInfo; 078import org.apache.hadoop.hbase.client.RegionLocator; 079import org.apache.hadoop.hbase.client.ResultScanner; 080import org.apache.hadoop.hbase.client.Scan; 081import org.apache.hadoop.hbase.client.Table; 082import org.apache.hadoop.hbase.client.TableDescriptor; 083import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 084import org.apache.hadoop.hbase.http.InfoServer; 085import org.apache.hadoop.hbase.tool.CanaryTool.RegionTask.TaskType; 086import org.apache.hadoop.hbase.util.Bytes; 087import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 088import org.apache.hadoop.hbase.util.Pair; 089import org.apache.hadoop.hbase.util.ReflectionUtils; 090import org.apache.hadoop.hbase.util.RegionSplitter; 091import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; 092import org.apache.hadoop.hbase.zookeeper.ZKConfig; 093import org.apache.hadoop.util.Tool; 094import org.apache.hadoop.util.ToolRunner; 095import org.apache.yetus.audience.InterfaceAudience; 096import org.apache.zookeeper.KeeperException; 097import org.apache.zookeeper.ZooKeeper; 098import org.apache.zookeeper.client.ConnectStringParser; 099import org.apache.zookeeper.data.Stat; 100import org.slf4j.Logger; 101import org.slf4j.LoggerFactory; 102 103import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 104 105/** 106 * HBase Canary Tool for "canary monitoring" of a running HBase cluster. There are three modes: 107 * <ol> 108 * <li>region mode (Default): For each region, try to get one row per column family outputting 109 * information on failure (ERROR) or else the latency.</li> 110 * <li>regionserver mode: For each regionserver try to get one row from one table selected randomly 111 * outputting information on failure (ERROR) or else the latency.</li> 112 * <li>zookeeper mode: for each zookeeper instance, selects a znode outputting information on 113 * failure (ERROR) or else the latency.</li> 114 * </ol> 115 */ 116@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 117public class CanaryTool implements Tool, Canary { 118 public static final String HBASE_CANARY_INFO_PORT = "hbase.canary.info.port"; 119 public static final String HBASE_CANARY_INFO_BINDADDRESS = "hbase.canary.info.bindAddress"; 120 121 private void putUpWebUI() throws IOException { 122 int port = conf.getInt(HBASE_CANARY_INFO_PORT, -1); 123 // -1 is for disabling info server 124 if (port < 0) { 125 return; 126 } 127 if (zookeeperMode) { 128 LOG.info("WebUI is not supported in Zookeeper mode"); 129 } else if (regionServerMode) { 130 LOG.info("WebUI is not supported in RegionServer mode"); 131 } else { 132 String addr = conf.get(HBASE_CANARY_INFO_BINDADDRESS, "0.0.0.0"); 133 try { 134 InfoServer infoServer = new InfoServer("canary", addr, port, false, conf); 135 infoServer.addUnprivilegedServlet("canary", "/canary-status", CanaryStatusServlet.class); 136 infoServer.setAttribute("sink", getSink(conf, RegionStdOutSink.class)); 137 infoServer.start(); 138 LOG.info("Bind Canary http info server to {}:{} ", addr, port); 139 } catch (BindException e) { 140 LOG.warn("Failed binding Canary http info server to {}:{}", addr, port, e); 141 } 142 } 143 } 144 145 @Override 146 public int checkRegions(String[] targets) throws Exception { 147 String configuredReadTableTimeoutsStr = conf.get(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT); 148 try { 149 LOG.info("Canary tool is running in Region mode"); 150 if (configuredReadTableTimeoutsStr != null) { 151 populateReadTableTimeoutsMap(configuredReadTableTimeoutsStr); 152 } 153 } catch (IllegalArgumentException e) { 154 LOG.error("Constructing read table timeouts map failed ", e); 155 return USAGE_EXIT_CODE; 156 } 157 return runMonitor(targets); 158 } 159 160 @Override 161 public int checkRegionServers(String[] targets) throws Exception { 162 regionServerMode = true; 163 LOG.info("Canary tool is running in RegionServer mode"); 164 return runMonitor(targets); 165 } 166 167 @Override 168 public int checkZooKeeper() throws Exception { 169 zookeeperMode = true; 170 LOG.info("Canary tool is running in ZooKeeper mode"); 171 return runMonitor(null); 172 } 173 174 /** 175 * Sink interface used by the canary to output information 176 */ 177 public interface Sink { 178 long getReadFailureCount(); 179 180 long incReadFailureCount(); 181 182 Map<String, String> getReadFailures(); 183 184 void updateReadFailures(String regionName, String serverName); 185 186 long getWriteFailureCount(); 187 188 long incWriteFailureCount(); 189 190 Map<String, String> getWriteFailures(); 191 192 void updateWriteFailures(String regionName, String serverName); 193 194 long getReadSuccessCount(); 195 196 long incReadSuccessCount(); 197 198 long getWriteSuccessCount(); 199 200 long incWriteSuccessCount(); 201 } 202 203 /** 204 * Simple implementation of canary sink that allows plotting to a file or standard output. 205 */ 206 public static class StdOutSink implements Sink { 207 private AtomicLong readFailureCount = new AtomicLong(0), writeFailureCount = new AtomicLong(0), 208 readSuccessCount = new AtomicLong(0), writeSuccessCount = new AtomicLong(0); 209 private Map<String, String> readFailures = new ConcurrentHashMap<>(); 210 private Map<String, String> writeFailures = new ConcurrentHashMap<>(); 211 212 @Override 213 public long getReadFailureCount() { 214 return readFailureCount.get(); 215 } 216 217 @Override 218 public long incReadFailureCount() { 219 return readFailureCount.incrementAndGet(); 220 } 221 222 @Override 223 public Map<String, String> getReadFailures() { 224 return readFailures; 225 } 226 227 @Override 228 public void updateReadFailures(String regionName, String serverName) { 229 readFailures.put(regionName, serverName); 230 } 231 232 @Override 233 public long getWriteFailureCount() { 234 return writeFailureCount.get(); 235 } 236 237 @Override 238 public long incWriteFailureCount() { 239 return writeFailureCount.incrementAndGet(); 240 } 241 242 @Override 243 public Map<String, String> getWriteFailures() { 244 return writeFailures; 245 } 246 247 @Override 248 public void updateWriteFailures(String regionName, String serverName) { 249 writeFailures.put(regionName, serverName); 250 } 251 252 @Override 253 public long getReadSuccessCount() { 254 return readSuccessCount.get(); 255 } 256 257 @Override 258 public long incReadSuccessCount() { 259 return readSuccessCount.incrementAndGet(); 260 } 261 262 @Override 263 public long getWriteSuccessCount() { 264 return writeSuccessCount.get(); 265 } 266 267 @Override 268 public long incWriteSuccessCount() { 269 return writeSuccessCount.incrementAndGet(); 270 } 271 } 272 273 /** 274 * By RegionServer, for 'regionserver' mode. 275 */ 276 public static class RegionServerStdOutSink extends StdOutSink { 277 public void publishReadFailure(String table, String server) { 278 incReadFailureCount(); 279 LOG.error("Read from {} on {}", table, server); 280 } 281 282 public void publishReadTiming(String table, String server, long msTime) { 283 LOG.info("Read from {} on {} in {}ms", table, server, msTime); 284 } 285 } 286 287 /** 288 * Output for 'zookeeper' mode. 289 */ 290 public static class ZookeeperStdOutSink extends StdOutSink { 291 public void publishReadFailure(String znode, String server) { 292 incReadFailureCount(); 293 LOG.error("Read from {} on {}", znode, server); 294 } 295 296 public void publishReadTiming(String znode, String server, long msTime) { 297 LOG.info("Read from {} on {} in {}ms", znode, server, msTime); 298 } 299 } 300 301 /** 302 * By Region, for 'region' mode. 303 */ 304 public static class RegionStdOutSink extends StdOutSink { 305 private Map<String, LongAdder> perTableReadLatency = new HashMap<>(); 306 private LongAdder writeLatency = new LongAdder(); 307 private final ConcurrentMap<String, List<RegionTaskResult>> regionMap = 308 new ConcurrentHashMap<>(); 309 private ConcurrentMap<ServerName, LongAdder> perServerFailuresCount = new ConcurrentHashMap<>(); 310 private ConcurrentMap<String, LongAdder> perTableFailuresCount = new ConcurrentHashMap<>(); 311 312 public ConcurrentMap<ServerName, LongAdder> getPerServerFailuresCount() { 313 return perServerFailuresCount; 314 } 315 316 public ConcurrentMap<String, LongAdder> getPerTableFailuresCount() { 317 return perTableFailuresCount; 318 } 319 320 public void resetFailuresCountDetails() { 321 perServerFailuresCount.clear(); 322 perTableFailuresCount.clear(); 323 } 324 325 private void incFailuresCountDetails(ServerName serverName, RegionInfo region) { 326 if (serverName != null) { 327 perServerFailuresCount.compute(serverName, (server, count) -> { 328 if (count == null) { 329 count = new LongAdder(); 330 } 331 count.increment(); 332 return count; 333 }); 334 } 335 perTableFailuresCount.compute(region.getTable().getNameAsString(), (tableName, count) -> { 336 if (count == null) { 337 count = new LongAdder(); 338 } 339 count.increment(); 340 return count; 341 }); 342 } 343 344 public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) { 345 LOG.error("Read from {} on serverName={} failed", region.getRegionNameAsString(), serverName, 346 e); 347 incReadFailureCount(); 348 incFailuresCountDetails(serverName, region); 349 } 350 351 public void publishReadFailure(ServerName serverName, RegionInfo region, 352 ColumnFamilyDescriptor column, Exception e) { 353 LOG.error("Read from {} on serverName={}, columnFamily={} failed", 354 region.getRegionNameAsString(), serverName, column.getNameAsString(), e); 355 incReadFailureCount(); 356 incFailuresCountDetails(serverName, region); 357 } 358 359 public void publishReadTiming(ServerName serverName, RegionInfo region, 360 ColumnFamilyDescriptor column, long msTime) { 361 RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column); 362 rtr.setReadSuccess(); 363 rtr.setReadLatency(msTime); 364 List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString()); 365 rtrs.add(rtr); 366 // Note that read success count will be equal to total column family read successes. 367 incReadSuccessCount(); 368 LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName, 369 column.getNameAsString(), msTime); 370 } 371 372 public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) { 373 LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e); 374 incWriteFailureCount(); 375 incFailuresCountDetails(serverName, region); 376 } 377 378 public void publishWriteFailure(ServerName serverName, RegionInfo region, 379 ColumnFamilyDescriptor column, Exception e) { 380 LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName, 381 column.getNameAsString(), e); 382 incWriteFailureCount(); 383 incFailuresCountDetails(serverName, region); 384 } 385 386 public void publishWriteTiming(ServerName serverName, RegionInfo region, 387 ColumnFamilyDescriptor column, long msTime) { 388 RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column); 389 rtr.setWriteSuccess(); 390 rtr.setWriteLatency(msTime); 391 List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString()); 392 rtrs.add(rtr); 393 // Note that write success count will be equal to total column family write successes. 394 incWriteSuccessCount(); 395 LOG.info("Write to {} on {} {} in {}ms", region.getRegionNameAsString(), serverName, 396 column.getNameAsString(), msTime); 397 } 398 399 public Map<String, LongAdder> getReadLatencyMap() { 400 return this.perTableReadLatency; 401 } 402 403 public LongAdder initializeAndGetReadLatencyForTable(String tableName) { 404 LongAdder initLatency = new LongAdder(); 405 this.perTableReadLatency.put(tableName, initLatency); 406 return initLatency; 407 } 408 409 public void initializeWriteLatency() { 410 this.writeLatency.reset(); 411 } 412 413 public LongAdder getWriteLatency() { 414 return this.writeLatency; 415 } 416 417 public ConcurrentMap<String, List<RegionTaskResult>> getRegionMap() { 418 return this.regionMap; 419 } 420 421 public int getTotalExpectedRegions() { 422 return this.regionMap.size(); 423 } 424 } 425 426 /** 427 * Run a single zookeeper Task and then exit. 428 */ 429 static class ZookeeperTask implements Callable<Void> { 430 private final Connection connection; 431 private final String host; 432 private String znode; 433 private final int timeout; 434 private ZookeeperStdOutSink sink; 435 436 public ZookeeperTask(Connection connection, String host, String znode, int timeout, 437 ZookeeperStdOutSink sink) { 438 this.connection = connection; 439 this.host = host; 440 this.znode = znode; 441 this.timeout = timeout; 442 this.sink = sink; 443 } 444 445 @Override 446 public Void call() throws Exception { 447 ZooKeeper zooKeeper = null; 448 try { 449 zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance); 450 Stat exists = zooKeeper.exists(znode, false); 451 StopWatch stopwatch = new StopWatch(); 452 stopwatch.start(); 453 zooKeeper.getData(znode, false, exists); 454 stopwatch.stop(); 455 sink.publishReadTiming(znode, host, stopwatch.getTime()); 456 } catch (KeeperException | InterruptedException e) { 457 sink.publishReadFailure(znode, host); 458 } finally { 459 if (zooKeeper != null) { 460 zooKeeper.close(); 461 } 462 } 463 return null; 464 } 465 } 466 467 /** 468 * Run a single Region Task and then exit. For each column family of the Region, get one row and 469 * output latency or failure. 470 */ 471 static class RegionTask implements Callable<Void> { 472 public enum TaskType { 473 READ, 474 WRITE 475 } 476 477 private Connection connection; 478 private RegionInfo region; 479 private RegionStdOutSink sink; 480 private TaskType taskType; 481 private boolean rawScanEnabled; 482 private ServerName serverName; 483 private LongAdder readWriteLatency; 484 private boolean readAllCF; 485 486 RegionTask(Connection connection, RegionInfo region, ServerName serverName, 487 RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency, 488 boolean readAllCF) { 489 this.connection = connection; 490 this.region = region; 491 this.serverName = serverName; 492 this.sink = sink; 493 this.taskType = taskType; 494 this.rawScanEnabled = rawScanEnabled; 495 this.readWriteLatency = rwLatency; 496 this.readAllCF = readAllCF; 497 } 498 499 @Override 500 public Void call() { 501 switch (taskType) { 502 case READ: 503 return read(); 504 case WRITE: 505 return write(); 506 default: 507 return read(); 508 } 509 } 510 511 private Void readColumnFamily(Table table, ColumnFamilyDescriptor column) { 512 byte[] startKey = null; 513 Scan scan = null; 514 ResultScanner rs = null; 515 StopWatch stopWatch = new StopWatch(); 516 startKey = region.getStartKey(); 517 // Can't do a get on empty start row so do a Scan of first element if any instead. 518 if (startKey.length > 0) { 519 Get get = new Get(startKey); 520 get.setCacheBlocks(false); 521 get.setFilter(new FirstKeyOnlyFilter()); 522 get.addFamily(column.getName()); 523 // Converting get object to scan to enable RAW SCAN. 524 // This will work for all the regions of the HBase tables except first region of the table. 525 scan = new Scan(get); 526 scan.setRaw(rawScanEnabled); 527 } else { 528 scan = new Scan(); 529 // In case of first region of the HBase Table, we do not have start-key for the region. 530 // For Region Canary, we only need to scan a single row/cell in the region to make sure that 531 // region is accessible. 532 // 533 // When HBase table has more than 1 empty regions at start of the row-key space, Canary will 534 // create multiple scan object to find first available row in the table by scanning all the 535 // regions in sequence until it can find first available row. 536 // 537 // This could result in multiple millions of scans based on the size of table and number of 538 // empty regions in sequence. In test environment, A table with no data and 1100 empty 539 // regions, Single canary run was creating close to half million to 1 million scans to 540 // successfully do canary run for the table. 541 // 542 // Since First region of the table doesn't have any start key, We should set End Key as 543 // stop row and set inclusive=false to limit scan to single region only. 544 // 545 // TODO : In future, we can streamline Canary behaviour for all the regions by doing scan 546 // with startRow inclusive and stopRow exclusive instead of different behaviour for First 547 // Region of the table and rest of the region of the table. This way implementation is 548 // simplified. As of now this change has been kept minimal to avoid any unnecessary 549 // perf impact. 550 scan.withStopRow(region.getEndKey(), false); 551 LOG.debug("rawScan {} for {}", rawScanEnabled, region.getTable()); 552 scan.setRaw(rawScanEnabled); 553 scan.setCaching(1); 554 scan.setCacheBlocks(false); 555 scan.setFilter(new FirstKeyOnlyFilter()); 556 scan.addFamily(column.getName()); 557 scan.setMaxResultSize(1L); 558 scan.setOneRowLimit(); 559 } 560 LOG.debug("Reading from {} {} {} {}", region.getTable(), region.getRegionNameAsString(), 561 column.getNameAsString(), Bytes.toStringBinary(startKey)); 562 try { 563 stopWatch.start(); 564 rs = table.getScanner(scan); 565 rs.next(); 566 stopWatch.stop(); 567 this.readWriteLatency.add(stopWatch.getTime()); 568 sink.publishReadTiming(serverName, region, column, stopWatch.getTime()); 569 } catch (Exception e) { 570 sink.publishReadFailure(serverName, region, column, e); 571 sink.updateReadFailures(region.getRegionNameAsString(), 572 serverName == null ? "NULL" : serverName.getHostname()); 573 } finally { 574 if (rs != null) { 575 rs.close(); 576 } 577 } 578 return null; 579 } 580 581 private ColumnFamilyDescriptor randomPickOneColumnFamily(ColumnFamilyDescriptor[] cfs) { 582 int size = cfs.length; 583 return cfs[ThreadLocalRandom.current().nextInt(size)]; 584 585 } 586 587 public Void read() { 588 Table table = null; 589 TableDescriptor tableDesc = null; 590 try { 591 LOG.debug("Reading table descriptor for table {}", region.getTable()); 592 table = connection.getTable(region.getTable()); 593 tableDesc = table.getDescriptor(); 594 } catch (IOException e) { 595 LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e); 596 sink.publishReadFailure(serverName, region, e); 597 if (table != null) { 598 try { 599 table.close(); 600 } catch (IOException ioe) { 601 LOG.error("Close table failed", e); 602 } 603 } 604 return null; 605 } 606 607 if (readAllCF) { 608 for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) { 609 readColumnFamily(table, column); 610 } 611 } else { 612 readColumnFamily(table, randomPickOneColumnFamily(tableDesc.getColumnFamilies())); 613 } 614 try { 615 table.close(); 616 } catch (IOException e) { 617 LOG.error("Close table failed", e); 618 } 619 return null; 620 } 621 622 /** 623 * Check writes for the canary table 624 */ 625 private Void write() { 626 Table table = null; 627 TableDescriptor tableDesc = null; 628 try { 629 table = connection.getTable(region.getTable()); 630 tableDesc = table.getDescriptor(); 631 byte[] rowToCheck = region.getStartKey(); 632 if (rowToCheck.length == 0) { 633 rowToCheck = new byte[] { 0x0 }; 634 } 635 int writeValueSize = 636 connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10); 637 for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) { 638 Put put = new Put(rowToCheck); 639 byte[] value = new byte[writeValueSize]; 640 Bytes.random(value); 641 put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value); 642 LOG.debug("Writing to {} {} {} {}", tableDesc.getTableName(), 643 region.getRegionNameAsString(), column.getNameAsString(), 644 Bytes.toStringBinary(rowToCheck)); 645 try { 646 long startTime = EnvironmentEdgeManager.currentTime(); 647 table.put(put); 648 long time = EnvironmentEdgeManager.currentTime() - startTime; 649 this.readWriteLatency.add(time); 650 sink.publishWriteTiming(serverName, region, column, time); 651 } catch (Exception e) { 652 sink.publishWriteFailure(serverName, region, column, e); 653 } 654 } 655 table.close(); 656 } catch (IOException e) { 657 sink.publishWriteFailure(serverName, region, e); 658 sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname()); 659 } 660 return null; 661 } 662 } 663 664 /** 665 * Run a single RegionServer Task and then exit. Get one row from a region on the regionserver and 666 * output latency or the failure. 667 */ 668 static class RegionServerTask implements Callable<Void> { 669 private Connection connection; 670 private String serverName; 671 private RegionInfo region; 672 private RegionServerStdOutSink sink; 673 private Boolean rawScanEnabled; 674 private AtomicLong successes; 675 676 RegionServerTask(Connection connection, String serverName, RegionInfo region, 677 RegionServerStdOutSink sink, Boolean rawScanEnabled, AtomicLong successes) { 678 this.connection = connection; 679 this.serverName = serverName; 680 this.region = region; 681 this.sink = sink; 682 this.rawScanEnabled = rawScanEnabled; 683 this.successes = successes; 684 } 685 686 @Override 687 public Void call() { 688 TableName tableName = null; 689 Table table = null; 690 Get get = null; 691 byte[] startKey = null; 692 Scan scan = null; 693 StopWatch stopWatch = new StopWatch(); 694 // monitor one region on every region server 695 stopWatch.reset(); 696 try { 697 tableName = region.getTable(); 698 table = connection.getTable(tableName); 699 startKey = region.getStartKey(); 700 // Can't do a get on empty start row so do a Scan of first element if any instead. 701 LOG.debug("Reading from {} {} {} {}", serverName, region.getTable(), 702 region.getRegionNameAsString(), Bytes.toStringBinary(startKey)); 703 if (startKey.length > 0) { 704 get = new Get(startKey); 705 get.setCacheBlocks(false); 706 get.setFilter(new FirstKeyOnlyFilter()); 707 // Converting get object to scan to enable RAW SCAN. 708 // This will work for all the regions of the HBase tables except first region. 709 scan = new Scan(get); 710 711 } else { 712 scan = new Scan(); 713 // In case of first region of the HBase Table, we do not have start-key for the region. 714 // For Region Canary, we only need scan a single row/cell in the region to make sure that 715 // region is accessible. 716 // 717 // When HBase table has more than 1 empty regions at start of the row-key space, Canary 718 // will create multiple scan object to find first available row in the table by scanning 719 // all the regions in sequence until it can find first available row. 720 // 721 // Since First region of the table doesn't have any start key, We should set End Key as 722 // stop row and set inclusive=false to limit scan to first region only. 723 scan.withStopRow(region.getEndKey(), false); 724 scan.setCacheBlocks(false); 725 scan.setFilter(new FirstKeyOnlyFilter()); 726 scan.setCaching(1); 727 scan.setMaxResultSize(1L); 728 scan.setOneRowLimit(); 729 } 730 scan.setRaw(rawScanEnabled); 731 stopWatch.start(); 732 ResultScanner s = table.getScanner(scan); 733 s.next(); 734 s.close(); 735 stopWatch.stop(); 736 successes.incrementAndGet(); 737 sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime()); 738 } catch (TableNotFoundException tnfe) { 739 LOG.error("Table may be deleted", tnfe); 740 // This is ignored because it doesn't imply that the regionserver is dead 741 } catch (TableNotEnabledException tnee) { 742 // This is considered a success since we got a response. 743 successes.incrementAndGet(); 744 LOG.debug("The targeted table was disabled. Assuming success."); 745 } catch (DoNotRetryIOException dnrioe) { 746 sink.publishReadFailure(tableName.getNameAsString(), serverName); 747 LOG.error(dnrioe.toString(), dnrioe); 748 } catch (IOException e) { 749 sink.publishReadFailure(tableName.getNameAsString(), serverName); 750 LOG.error(e.toString(), e); 751 } finally { 752 if (table != null) { 753 try { 754 table.close(); 755 } catch (IOException e) {/* DO NOTHING */ 756 LOG.error("Close table failed", e); 757 } 758 } 759 scan = null; 760 get = null; 761 startKey = null; 762 } 763 return null; 764 } 765 } 766 767 private static final int USAGE_EXIT_CODE = 1; 768 private static final int INIT_ERROR_EXIT_CODE = 2; 769 private static final int TIMEOUT_ERROR_EXIT_CODE = 3; 770 private static final int ERROR_EXIT_CODE = 4; 771 private static final int FAILURE_EXIT_CODE = 5; 772 773 private static final long DEFAULT_INTERVAL = 60000; 774 775 private static final long DEFAULT_TIMEOUT = 600000; // 10 mins 776 private static final int MAX_THREADS_NUM = 16; // #threads to contact regions 777 778 private static final Logger LOG = LoggerFactory.getLogger(Canary.class); 779 780 public static final TableName DEFAULT_WRITE_TABLE_NAME = 781 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary"); 782 783 private static final String CANARY_TABLE_FAMILY_NAME = "Test"; 784 785 private Configuration conf = null; 786 private long interval = 0; 787 private Sink sink = null; 788 789 /** 790 * True if we are to run in 'regionServer' mode. 791 */ 792 private boolean regionServerMode = false; 793 794 /** 795 * True if we are to run in zookeeper 'mode'. 796 */ 797 private boolean zookeeperMode = false; 798 799 /** 800 * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e. we 801 * aggregate time to fetch each region and it needs to be less than this value else we log an 802 * ERROR. 803 */ 804 private HashMap<String, Long> configuredReadTableTimeouts = new HashMap<>(); 805 806 public static final String HBASE_CANARY_REGIONSERVER_ALL_REGIONS = 807 "hbase.canary.regionserver_all_regions"; 808 809 public static final String HBASE_CANARY_REGION_WRITE_SNIFFING = 810 "hbase.canary.region.write.sniffing"; 811 public static final String HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT = 812 "hbase.canary.region.write.table.timeout"; 813 public static final String HBASE_CANARY_REGION_WRITE_TABLE_NAME = 814 "hbase.canary.region.write.table.name"; 815 public static final String HBASE_CANARY_REGION_READ_TABLE_TIMEOUT = 816 "hbase.canary.region.read.table.timeout"; 817 818 public static final String HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES = 819 "hbase.canary.zookeeper.permitted.failures"; 820 821 public static final String HBASE_CANARY_USE_REGEX = "hbase.canary.use.regex"; 822 public static final String HBASE_CANARY_TIMEOUT = "hbase.canary.timeout"; 823 public static final String HBASE_CANARY_FAIL_ON_ERROR = "hbase.canary.fail.on.error"; 824 825 private ExecutorService executor; // threads to retrieve data from regionservers 826 827 public CanaryTool() { 828 this(new ScheduledThreadPoolExecutor(1)); 829 } 830 831 public CanaryTool(ExecutorService executor) { 832 this(executor, null); 833 } 834 835 @InterfaceAudience.Private 836 CanaryTool(ExecutorService executor, Sink sink) { 837 this.executor = executor; 838 this.sink = sink; 839 } 840 841 CanaryTool(Configuration conf, ExecutorService executor) { 842 this(conf, executor, null); 843 } 844 845 CanaryTool(Configuration conf, ExecutorService executor, Sink sink) { 846 this(executor, sink); 847 setConf(conf); 848 } 849 850 @Override 851 public Configuration getConf() { 852 return conf; 853 } 854 855 @Override 856 public void setConf(Configuration conf) { 857 if (conf == null) { 858 conf = HBaseConfiguration.create(); 859 } 860 this.conf = conf; 861 } 862 863 private int parseArgs(String[] args) { 864 int index = -1; 865 long permittedFailures = 0; 866 boolean regionServerAllRegions = false, writeSniffing = false; 867 String readTableTimeoutsStr = null; 868 // Process command line args 869 for (int i = 0; i < args.length; i++) { 870 String cmd = args[i]; 871 if (cmd.startsWith("-")) { 872 if (index >= 0) { 873 // command line args must be in the form: [opts] [table 1 [table 2 ...]] 874 System.err.println("Invalid command line options"); 875 printUsageAndExit(); 876 } 877 if (cmd.equals("-help") || cmd.equals("-h")) { 878 // user asked for help, print the help and quit. 879 printUsageAndExit(); 880 } else if (cmd.equals("-daemon") && interval == 0) { 881 // user asked for daemon mode, set a default interval between checks 882 interval = DEFAULT_INTERVAL; 883 } else if (cmd.equals("-interval")) { 884 // user has specified an interval for canary breaths (-interval N) 885 i++; 886 887 if (i == args.length) { 888 System.err.println("-interval takes a numeric seconds value argument."); 889 printUsageAndExit(); 890 } 891 try { 892 interval = Long.parseLong(args[i]) * 1000; 893 } catch (NumberFormatException e) { 894 System.err.println("-interval needs a numeric value argument."); 895 printUsageAndExit(); 896 } 897 } else if (cmd.equals("-zookeeper")) { 898 this.zookeeperMode = true; 899 } else if (cmd.equals("-regionserver")) { 900 this.regionServerMode = true; 901 } else if (cmd.equals("-allRegions")) { 902 conf.setBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, true); 903 regionServerAllRegions = true; 904 } else if (cmd.equals("-writeSniffing")) { 905 writeSniffing = true; 906 conf.setBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, true); 907 } else if (cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) { 908 conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); 909 } else if (cmd.equals("-e")) { 910 conf.setBoolean(HBASE_CANARY_USE_REGEX, true); 911 } else if (cmd.equals("-t")) { 912 i++; 913 914 if (i == args.length) { 915 System.err.println("-t takes a numeric milliseconds value argument."); 916 printUsageAndExit(); 917 } 918 long timeout = 0; 919 try { 920 timeout = Long.parseLong(args[i]); 921 } catch (NumberFormatException e) { 922 System.err.println("-t takes a numeric milliseconds value argument."); 923 printUsageAndExit(); 924 } 925 conf.setLong(HBASE_CANARY_TIMEOUT, timeout); 926 } else if (cmd.equals("-writeTableTimeout")) { 927 i++; 928 929 if (i == args.length) { 930 System.err.println("-writeTableTimeout takes a numeric milliseconds value argument."); 931 printUsageAndExit(); 932 } 933 long configuredWriteTableTimeout = 0; 934 try { 935 configuredWriteTableTimeout = Long.parseLong(args[i]); 936 } catch (NumberFormatException e) { 937 System.err.println("-writeTableTimeout takes a numeric milliseconds value argument."); 938 printUsageAndExit(); 939 } 940 conf.setLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, configuredWriteTableTimeout); 941 } else if (cmd.equals("-writeTable")) { 942 i++; 943 944 if (i == args.length) { 945 System.err.println("-writeTable takes a string tablename value argument."); 946 printUsageAndExit(); 947 } 948 conf.set(HBASE_CANARY_REGION_WRITE_TABLE_NAME, args[i]); 949 } else if (cmd.equals("-f")) { 950 i++; 951 if (i == args.length) { 952 System.err.println("-f needs a boolean value argument (true|false)."); 953 printUsageAndExit(); 954 } 955 956 conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, Boolean.parseBoolean(args[i])); 957 } else if (cmd.equals("-readTableTimeouts")) { 958 i++; 959 if (i == args.length) { 960 System.err.println("-readTableTimeouts needs a comma-separated list of read " 961 + "millisecond timeouts per table (without spaces)."); 962 printUsageAndExit(); 963 } 964 readTableTimeoutsStr = args[i]; 965 conf.set(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT, readTableTimeoutsStr); 966 } else if (cmd.equals("-permittedZookeeperFailures")) { 967 i++; 968 969 if (i == args.length) { 970 System.err.println("-permittedZookeeperFailures needs a numeric value argument."); 971 printUsageAndExit(); 972 } 973 try { 974 permittedFailures = Long.parseLong(args[i]); 975 } catch (NumberFormatException e) { 976 System.err.println("-permittedZookeeperFailures needs a numeric value argument."); 977 printUsageAndExit(); 978 } 979 conf.setLong(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, permittedFailures); 980 } else { 981 // no options match 982 System.err.println(cmd + " options is invalid."); 983 printUsageAndExit(); 984 } 985 } else if (index < 0) { 986 // keep track of first table name specified by the user 987 index = i; 988 } 989 } 990 if (regionServerAllRegions && !this.regionServerMode) { 991 System.err.println("-allRegions can only be specified in regionserver mode."); 992 printUsageAndExit(); 993 } 994 if (this.zookeeperMode) { 995 if (this.regionServerMode || regionServerAllRegions || writeSniffing) { 996 System.err.println("-zookeeper is exclusive and cannot be combined with " + "other modes."); 997 printUsageAndExit(); 998 } 999 } 1000 if (permittedFailures != 0 && !this.zookeeperMode) { 1001 System.err.println("-permittedZookeeperFailures requires -zookeeper mode."); 1002 printUsageAndExit(); 1003 } 1004 if (readTableTimeoutsStr != null && (this.regionServerMode || this.zookeeperMode)) { 1005 System.err.println("-readTableTimeouts can only be configured in region mode."); 1006 printUsageAndExit(); 1007 } 1008 return index; 1009 } 1010 1011 @Override 1012 public int run(String[] args) throws Exception { 1013 int index = parseArgs(args); 1014 String[] monitorTargets = null; 1015 1016 if (index >= 0) { 1017 int length = args.length - index; 1018 monitorTargets = new String[length]; 1019 System.arraycopy(args, index, monitorTargets, 0, length); 1020 } 1021 if (interval > 0) { 1022 // Only show the web page in daemon mode 1023 putUpWebUI(); 1024 } 1025 if (zookeeperMode) { 1026 return checkZooKeeper(); 1027 } else if (regionServerMode) { 1028 return checkRegionServers(monitorTargets); 1029 } else { 1030 return checkRegions(monitorTargets); 1031 } 1032 } 1033 1034 private int runMonitor(String[] monitorTargets) throws Exception { 1035 ChoreService choreService = null; 1036 1037 // Launches chore for refreshing kerberos credentials if security is enabled. 1038 // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster 1039 // for more details. 1040 final ScheduledChore authChore = AuthUtil.getAuthChore(conf); 1041 if (authChore != null) { 1042 choreService = new ChoreService("CANARY_TOOL"); 1043 choreService.scheduleChore(authChore); 1044 } 1045 1046 // Start to prepare the stuffs 1047 Monitor monitor = null; 1048 Thread monitorThread; 1049 long startTime = 0; 1050 long currentTimeLength = 0; 1051 boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); 1052 long timeout = conf.getLong(HBASE_CANARY_TIMEOUT, DEFAULT_TIMEOUT); 1053 // Get a connection to use in below. 1054 try (Connection connection = ConnectionFactory.createConnection(this.conf)) { 1055 do { 1056 // Do monitor !! 1057 try { 1058 monitor = this.newMonitor(connection, monitorTargets); 1059 startTime = EnvironmentEdgeManager.currentTime(); 1060 monitorThread = new Thread(monitor, "CanaryMonitor-" + startTime); 1061 monitorThread.start(); 1062 while (!monitor.isDone()) { 1063 // wait for 1 sec 1064 Thread.sleep(1000); 1065 // exit if any error occurs 1066 if (failOnError && monitor.hasError()) { 1067 monitorThread.interrupt(); 1068 if (monitor.initialized) { 1069 return monitor.errorCode; 1070 } else { 1071 return INIT_ERROR_EXIT_CODE; 1072 } 1073 } 1074 currentTimeLength = EnvironmentEdgeManager.currentTime() - startTime; 1075 if (currentTimeLength > timeout) { 1076 LOG.error("The monitor is running too long (" + currentTimeLength 1077 + ") after timeout limit:" + timeout + " will be killed itself !!"); 1078 if (monitor.initialized) { 1079 return TIMEOUT_ERROR_EXIT_CODE; 1080 } else { 1081 return INIT_ERROR_EXIT_CODE; 1082 } 1083 } 1084 } 1085 1086 if (failOnError && monitor.finalCheckForErrors()) { 1087 monitorThread.interrupt(); 1088 return monitor.errorCode; 1089 } 1090 } finally { 1091 if (monitor != null) { 1092 monitor.close(); 1093 } 1094 } 1095 1096 Thread.sleep(interval); 1097 } while (interval > 0); 1098 } // try-with-resources close 1099 1100 if (choreService != null) { 1101 choreService.shutdown(); 1102 } 1103 return monitor.errorCode; 1104 } 1105 1106 @Override 1107 public Map<String, String> getReadFailures() { 1108 return sink.getReadFailures(); 1109 } 1110 1111 @Override 1112 public Map<String, String> getWriteFailures() { 1113 return sink.getWriteFailures(); 1114 } 1115 1116 private void printUsageAndExit() { 1117 System.err.println( 1118 "Usage: canary [OPTIONS] [<TABLE1> [<TABLE2]...] | [<REGIONSERVER1> [<REGIONSERVER2]..]"); 1119 System.err.println("Where [OPTIONS] are:"); 1120 System.err.println(" -h,-help show this help and exit."); 1121 System.err.println( 1122 " -regionserver set 'regionserver mode'; gets row from random region on " + "server"); 1123 System.err.println( 1124 " -allRegions get from ALL regions when 'regionserver mode', not just " + "random one."); 1125 System.err.println(" -zookeeper set 'zookeeper mode'; grab zookeeper.znode.parent on " 1126 + "each ensemble member"); 1127 System.err.println(" -daemon continuous check at defined intervals."); 1128 System.err.println(" -interval <N> interval between checks in seconds"); 1129 System.err 1130 .println(" -e consider table/regionserver argument as regular " + "expression"); 1131 System.err.println(" -f <B> exit on first error; default=true"); 1132 System.err.println(" -failureAsError treat read/write failure as error"); 1133 System.err.println(" -t <N> timeout for canary-test run; default=600000ms"); 1134 System.err.println(" -writeSniffing enable write sniffing"); 1135 System.err.println(" -writeTable the table used for write sniffing; default=hbase:canary"); 1136 System.err.println(" -writeTableTimeout <N> timeout for writeTable; default=600000ms"); 1137 System.err.println( 1138 " -readTableTimeouts <tableName>=<read timeout>," + "<tableName>=<read timeout>,..."); 1139 System.err 1140 .println(" comma-separated list of table read timeouts " + "(no spaces);"); 1141 System.err.println(" logs 'ERROR' if takes longer. default=600000ms"); 1142 System.err.println(" -permittedZookeeperFailures <N> Ignore first N failures attempting to "); 1143 System.err.println(" connect to individual zookeeper nodes in ensemble"); 1144 System.err.println(""); 1145 System.err.println(" -D<configProperty>=<value> to assign or override configuration params"); 1146 System.err.println(" -Dhbase.canary.read.raw.enabled=<true/false> Set to enable/disable " 1147 + "raw scan; default=false"); 1148 System.err.println( 1149 " -Dhbase.canary.info.port=PORT_NUMBER Set for a Canary UI; " + "default=-1 (None)"); 1150 System.err.println(""); 1151 System.err.println( 1152 "Canary runs in one of three modes: region (default), regionserver, or " + "zookeeper."); 1153 System.err.println("To sniff/probe all regions, pass no arguments."); 1154 System.err.println("To sniff/probe all regions of a table, pass tablename."); 1155 System.err.println("To sniff/probe regionservers, pass -regionserver, etc."); 1156 System.err.println("See http://hbase.apache.org/book.html#_canary for Canary documentation."); 1157 System.exit(USAGE_EXIT_CODE); 1158 } 1159 1160 Sink getSink(Configuration configuration, Class clazz) { 1161 // In test context, this.sink might be set. Use it if non-null. For testing. 1162 return this.sink != null 1163 ? this.sink 1164 : (Sink) ReflectionUtils 1165 .newInstance(configuration.getClass("hbase.canary.sink.class", clazz, Sink.class)); 1166 } 1167 1168 /** 1169 * Canary region mode-specific data structure which stores information about each region to be 1170 * scanned 1171 */ 1172 public static class RegionTaskResult { 1173 private RegionInfo region; 1174 private TableName tableName; 1175 private ServerName serverName; 1176 private ColumnFamilyDescriptor column; 1177 private AtomicLong readLatency = null; 1178 private AtomicLong writeLatency = null; 1179 private boolean readSuccess = false; 1180 private boolean writeSuccess = false; 1181 1182 public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName, 1183 ColumnFamilyDescriptor column) { 1184 this.region = region; 1185 this.tableName = tableName; 1186 this.serverName = serverName; 1187 this.column = column; 1188 } 1189 1190 public RegionInfo getRegionInfo() { 1191 return this.region; 1192 } 1193 1194 public String getRegionNameAsString() { 1195 return this.region.getRegionNameAsString(); 1196 } 1197 1198 public TableName getTableName() { 1199 return this.tableName; 1200 } 1201 1202 public String getTableNameAsString() { 1203 return this.tableName.getNameAsString(); 1204 } 1205 1206 public ServerName getServerName() { 1207 return this.serverName; 1208 } 1209 1210 public String getServerNameAsString() { 1211 return this.serverName.getServerName(); 1212 } 1213 1214 public ColumnFamilyDescriptor getColumnFamily() { 1215 return this.column; 1216 } 1217 1218 public String getColumnFamilyNameAsString() { 1219 return this.column.getNameAsString(); 1220 } 1221 1222 public long getReadLatency() { 1223 if (this.readLatency == null) { 1224 return -1; 1225 } 1226 return this.readLatency.get(); 1227 } 1228 1229 public void setReadLatency(long readLatency) { 1230 if (this.readLatency != null) { 1231 this.readLatency.set(readLatency); 1232 } else { 1233 this.readLatency = new AtomicLong(readLatency); 1234 } 1235 } 1236 1237 public long getWriteLatency() { 1238 if (this.writeLatency == null) { 1239 return -1; 1240 } 1241 return this.writeLatency.get(); 1242 } 1243 1244 public void setWriteLatency(long writeLatency) { 1245 if (this.writeLatency != null) { 1246 this.writeLatency.set(writeLatency); 1247 } else { 1248 this.writeLatency = new AtomicLong(writeLatency); 1249 } 1250 } 1251 1252 public boolean isReadSuccess() { 1253 return this.readSuccess; 1254 } 1255 1256 public void setReadSuccess() { 1257 this.readSuccess = true; 1258 } 1259 1260 public boolean isWriteSuccess() { 1261 return this.writeSuccess; 1262 } 1263 1264 public void setWriteSuccess() { 1265 this.writeSuccess = true; 1266 } 1267 } 1268 1269 /** 1270 * A Factory method for {@link Monitor}. Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a 1271 * RegionMonitor. 1272 * @return a Monitor instance 1273 */ 1274 private Monitor newMonitor(final Connection connection, String[] monitorTargets) { 1275 Monitor monitor; 1276 boolean useRegExp = conf.getBoolean(HBASE_CANARY_USE_REGEX, false); 1277 boolean regionServerAllRegions = conf.getBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, false); 1278 boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); 1279 int permittedFailures = conf.getInt(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, 0); 1280 boolean writeSniffing = conf.getBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, false); 1281 String writeTableName = 1282 conf.get(HBASE_CANARY_REGION_WRITE_TABLE_NAME, DEFAULT_WRITE_TABLE_NAME.getNameAsString()); 1283 long configuredWriteTableTimeout = 1284 conf.getLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, DEFAULT_TIMEOUT); 1285 1286 if (this.regionServerMode) { 1287 monitor = new RegionServerMonitor(connection, monitorTargets, useRegExp, 1288 getSink(connection.getConfiguration(), RegionServerStdOutSink.class), this.executor, 1289 regionServerAllRegions, failOnError, permittedFailures); 1290 1291 } else if (this.zookeeperMode) { 1292 monitor = new ZookeeperMonitor(connection, monitorTargets, useRegExp, 1293 getSink(connection.getConfiguration(), ZookeeperStdOutSink.class), this.executor, 1294 failOnError, permittedFailures); 1295 } else { 1296 monitor = new RegionMonitor(connection, monitorTargets, useRegExp, 1297 getSink(connection.getConfiguration(), RegionStdOutSink.class), this.executor, 1298 writeSniffing, TableName.valueOf(writeTableName), failOnError, configuredReadTableTimeouts, 1299 configuredWriteTableTimeout, permittedFailures); 1300 } 1301 return monitor; 1302 } 1303 1304 private void populateReadTableTimeoutsMap(String configuredReadTableTimeoutsStr) { 1305 String[] tableTimeouts = configuredReadTableTimeoutsStr.split(","); 1306 for (String tT : tableTimeouts) { 1307 String[] nameTimeout = tT.split("="); 1308 if (nameTimeout.length < 2) { 1309 throw new IllegalArgumentException("Each -readTableTimeouts argument must be of the form " 1310 + "<tableName>=<read timeout> (without spaces)."); 1311 } 1312 long timeoutVal; 1313 try { 1314 timeoutVal = Long.parseLong(nameTimeout[1]); 1315 } catch (NumberFormatException e) { 1316 throw new IllegalArgumentException( 1317 "-readTableTimeouts read timeout for each table" + " must be a numeric value argument."); 1318 } 1319 configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal); 1320 } 1321 } 1322 1323 /** 1324 * A Monitor super-class can be extended by users 1325 */ 1326 public static abstract class Monitor implements Runnable, Closeable { 1327 protected Connection connection; 1328 protected Admin admin; 1329 /** 1330 * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes. Passed on the 1331 * command-line as arguments. 1332 */ 1333 protected String[] targets; 1334 protected boolean useRegExp; 1335 protected boolean treatFailureAsError; 1336 protected boolean initialized = false; 1337 1338 protected boolean done = false; 1339 protected int errorCode = 0; 1340 protected long allowedFailures = 0; 1341 protected Sink sink; 1342 protected ExecutorService executor; 1343 1344 public boolean isDone() { 1345 return done; 1346 } 1347 1348 public boolean hasError() { 1349 return errorCode != 0; 1350 } 1351 1352 public boolean finalCheckForErrors() { 1353 if (errorCode != 0) { 1354 return true; 1355 } 1356 if ( 1357 treatFailureAsError && (sink.getReadFailureCount() > allowedFailures 1358 || sink.getWriteFailureCount() > allowedFailures) 1359 ) { 1360 LOG.error("Too many failures detected, treating failure as error, failing the Canary."); 1361 errorCode = FAILURE_EXIT_CODE; 1362 return true; 1363 } 1364 return false; 1365 } 1366 1367 @Override 1368 public void close() throws IOException { 1369 if (this.admin != null) { 1370 this.admin.close(); 1371 } 1372 } 1373 1374 protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink, 1375 ExecutorService executor, boolean treatFailureAsError, long allowedFailures) { 1376 if (null == connection) { 1377 throw new IllegalArgumentException("connection shall not be null"); 1378 } 1379 1380 this.connection = connection; 1381 this.targets = monitorTargets; 1382 this.useRegExp = useRegExp; 1383 this.treatFailureAsError = treatFailureAsError; 1384 this.sink = sink; 1385 this.executor = executor; 1386 this.allowedFailures = allowedFailures; 1387 } 1388 1389 @Override 1390 public abstract void run(); 1391 1392 protected boolean initAdmin() { 1393 if (null == this.admin) { 1394 try { 1395 this.admin = this.connection.getAdmin(); 1396 } catch (Exception e) { 1397 LOG.error("Initial HBaseAdmin failed...", e); 1398 this.errorCode = INIT_ERROR_EXIT_CODE; 1399 } 1400 } else if (admin.isAborted()) { 1401 LOG.error("HBaseAdmin aborted"); 1402 this.errorCode = INIT_ERROR_EXIT_CODE; 1403 } 1404 return !this.hasError(); 1405 } 1406 } 1407 1408 /** 1409 * A monitor for region mode. 1410 */ 1411 private static class RegionMonitor extends Monitor { 1412 // 10 minutes 1413 private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000; 1414 // 1 days 1415 private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60; 1416 1417 private long lastCheckTime = -1; 1418 private boolean writeSniffing; 1419 private TableName writeTableName; 1420 private int writeDataTTL; 1421 private float regionsLowerLimit; 1422 private float regionsUpperLimit; 1423 private int checkPeriod; 1424 private boolean rawScanEnabled; 1425 private boolean readAllCF; 1426 1427 /** 1428 * This is a timeout per table. If read of each region in the table aggregated takes longer than 1429 * what is configured here, we log an ERROR rather than just an INFO. 1430 */ 1431 private HashMap<String, Long> configuredReadTableTimeouts; 1432 1433 private long configuredWriteTableTimeout; 1434 1435 public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1436 Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName, 1437 boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts, 1438 long configuredWriteTableTimeout, long allowedFailures) { 1439 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, 1440 allowedFailures); 1441 Configuration conf = connection.getConfiguration(); 1442 this.writeSniffing = writeSniffing; 1443 this.writeTableName = writeTableName; 1444 this.writeDataTTL = 1445 conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL); 1446 this.regionsLowerLimit = 1447 conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f); 1448 this.regionsUpperLimit = 1449 conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f); 1450 this.checkPeriod = conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY, 1451 DEFAULT_WRITE_TABLE_CHECK_PERIOD); 1452 this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false); 1453 this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts); 1454 this.configuredWriteTableTimeout = configuredWriteTableTimeout; 1455 this.readAllCF = conf.getBoolean(HConstants.HBASE_CANARY_READ_ALL_CF, true); 1456 } 1457 1458 private RegionStdOutSink getSink() { 1459 if (!(sink instanceof RegionStdOutSink)) { 1460 throw new RuntimeException("Can only write to Region sink"); 1461 } 1462 return ((RegionStdOutSink) sink); 1463 } 1464 1465 @Override 1466 public void run() { 1467 if (this.initAdmin()) { 1468 try { 1469 List<Future<Void>> taskFutures = new LinkedList<>(); 1470 RegionStdOutSink regionSink = this.getSink(); 1471 regionSink.resetFailuresCountDetails(); 1472 if (this.targets != null && this.targets.length > 0) { 1473 String[] tables = generateMonitorTables(this.targets); 1474 // Check to see that each table name passed in the -readTableTimeouts argument is also 1475 // passed as a monitor target. 1476 if ( 1477 !new HashSet<>(Arrays.asList(tables)) 1478 .containsAll(this.configuredReadTableTimeouts.keySet()) 1479 ) { 1480 LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets " 1481 + "passed via command line."); 1482 this.errorCode = USAGE_EXIT_CODE; 1483 return; 1484 } 1485 this.initialized = true; 1486 for (String table : tables) { 1487 LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table); 1488 taskFutures.addAll(CanaryTool.sniff(admin, regionSink, table, executor, TaskType.READ, 1489 this.rawScanEnabled, readLatency, readAllCF)); 1490 } 1491 } else { 1492 taskFutures.addAll(sniff(TaskType.READ, regionSink)); 1493 } 1494 1495 if (writeSniffing) { 1496 if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) { 1497 try { 1498 checkWriteTableDistribution(); 1499 } catch (IOException e) { 1500 LOG.error("Check canary table distribution failed!", e); 1501 } 1502 lastCheckTime = EnvironmentEdgeManager.currentTime(); 1503 } 1504 // sniff canary table with write operation 1505 regionSink.initializeWriteLatency(); 1506 LongAdder writeTableLatency = regionSink.getWriteLatency(); 1507 taskFutures 1508 .addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName), 1509 executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency, readAllCF)); 1510 } 1511 1512 for (Future<Void> future : taskFutures) { 1513 try { 1514 future.get(); 1515 } catch (ExecutionException e) { 1516 LOG.error("Sniff region failed!", e); 1517 } 1518 } 1519 Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap(); 1520 for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) { 1521 String tableName = entry.getKey(); 1522 if (actualReadTableLatency.containsKey(tableName)) { 1523 Long actual = actualReadTableLatency.get(tableName).longValue(); 1524 Long configured = entry.getValue(); 1525 if (actual > configured) { 1526 LOG.error("Read operation for {} took {}ms exceeded the configured read timeout." 1527 + "(Configured read timeout {}ms.", tableName, actual, configured); 1528 } else { 1529 LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.", 1530 tableName, actual, configured); 1531 } 1532 } else { 1533 LOG.error("Read operation for {} failed!", tableName); 1534 } 1535 } 1536 if (this.writeSniffing) { 1537 String writeTableStringName = this.writeTableName.getNameAsString(); 1538 long actualWriteLatency = regionSink.getWriteLatency().longValue(); 1539 LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.", 1540 writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout); 1541 // Check that the writeTable write operation latency does not exceed the configured 1542 // timeout. 1543 if (actualWriteLatency > this.configuredWriteTableTimeout) { 1544 LOG.error("Write operation for {} exceeded the configured write timeout.", 1545 writeTableStringName); 1546 } 1547 } 1548 } catch (Exception e) { 1549 LOG.error("Run regionMonitor failed", e); 1550 this.errorCode = ERROR_EXIT_CODE; 1551 } finally { 1552 this.done = true; 1553 } 1554 } 1555 this.done = true; 1556 } 1557 1558 /** Returns List of tables to use in test. */ 1559 private String[] generateMonitorTables(String[] monitorTargets) throws IOException { 1560 String[] returnTables = null; 1561 1562 if (this.useRegExp) { 1563 Pattern pattern = null; 1564 List<TableDescriptor> tds = null; 1565 Set<String> tmpTables = new TreeSet<>(); 1566 try { 1567 LOG.debug(String.format("reading list of tables")); 1568 tds = this.admin.listTableDescriptors(pattern); 1569 if (tds == null) { 1570 tds = Collections.emptyList(); 1571 } 1572 for (String monitorTarget : monitorTargets) { 1573 pattern = Pattern.compile(monitorTarget); 1574 for (TableDescriptor td : tds) { 1575 if (pattern.matcher(td.getTableName().getNameAsString()).matches()) { 1576 tmpTables.add(td.getTableName().getNameAsString()); 1577 } 1578 } 1579 } 1580 } catch (IOException e) { 1581 LOG.error("Communicate with admin failed", e); 1582 throw e; 1583 } 1584 1585 if (tmpTables.size() > 0) { 1586 returnTables = tmpTables.toArray(new String[tmpTables.size()]); 1587 } else { 1588 String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets); 1589 LOG.error(msg); 1590 this.errorCode = INIT_ERROR_EXIT_CODE; 1591 throw new TableNotFoundException(msg); 1592 } 1593 } else { 1594 returnTables = monitorTargets; 1595 } 1596 1597 return returnTables; 1598 } 1599 1600 /* 1601 * Canary entry point to monitor all the tables. 1602 */ 1603 private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink) 1604 throws Exception { 1605 LOG.debug("Reading list of tables"); 1606 List<Future<Void>> taskFutures = new LinkedList<>(); 1607 for (TableDescriptor td : admin.listTableDescriptors()) { 1608 if ( 1609 admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName()) 1610 && (!td.getTableName().equals(writeTableName)) 1611 ) { 1612 LongAdder readLatency = 1613 regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString()); 1614 taskFutures.addAll(CanaryTool.sniff(admin, sink, td, executor, taskType, 1615 this.rawScanEnabled, readLatency, readAllCF)); 1616 } 1617 } 1618 return taskFutures; 1619 } 1620 1621 private void checkWriteTableDistribution() throws IOException { 1622 if (!admin.tableExists(writeTableName)) { 1623 int numberOfServers = admin.getRegionServers().size(); 1624 if (numberOfServers == 0) { 1625 throw new IllegalStateException("No live regionservers"); 1626 } 1627 createWriteTable(numberOfServers); 1628 } 1629 1630 if (!admin.isTableEnabled(writeTableName)) { 1631 admin.enableTable(writeTableName); 1632 } 1633 1634 ClusterMetrics status = 1635 admin.getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER)); 1636 int numberOfServers = status.getServersName().size(); 1637 if (status.getServersName().contains(status.getMasterName())) { 1638 numberOfServers -= 1; 1639 } 1640 1641 List<Pair<RegionInfo, ServerName>> pairs = 1642 MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName); 1643 int numberOfRegions = pairs.size(); 1644 if ( 1645 numberOfRegions < numberOfServers * regionsLowerLimit 1646 || numberOfRegions > numberOfServers * regionsUpperLimit 1647 ) { 1648 admin.disableTable(writeTableName); 1649 admin.deleteTable(writeTableName); 1650 createWriteTable(numberOfServers); 1651 } 1652 HashSet<ServerName> serverSet = new HashSet<>(); 1653 for (Pair<RegionInfo, ServerName> pair : pairs) { 1654 serverSet.add(pair.getSecond()); 1655 } 1656 int numberOfCoveredServers = serverSet.size(); 1657 if (numberOfCoveredServers < numberOfServers) { 1658 admin.balance(); 1659 } 1660 } 1661 1662 private void createWriteTable(int numberOfServers) throws IOException { 1663 int numberOfRegions = (int) (numberOfServers * regionsLowerLimit); 1664 LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions " 1665 + "(current lower limit of regions per server is {} and you can change it with config {}).", 1666 numberOfServers, numberOfRegions, regionsLowerLimit, 1667 HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY); 1668 HTableDescriptor desc = new HTableDescriptor(writeTableName); 1669 HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME); 1670 family.setMaxVersions(1); 1671 family.setTimeToLive(writeDataTTL); 1672 1673 desc.addFamily(family); 1674 byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions); 1675 admin.createTable(desc, splits); 1676 } 1677 } 1678 1679 /** 1680 * Canary entry point for specified table. 1681 * @throws Exception exception 1682 */ 1683 private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName, 1684 ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency, 1685 boolean readAllCF) throws Exception { 1686 LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName); 1687 if (admin.isTableEnabled(TableName.valueOf(tableName))) { 1688 return CanaryTool.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)), 1689 executor, taskType, rawScanEnabled, readLatency, readAllCF); 1690 } else { 1691 LOG.warn("Table {} is not enabled", tableName); 1692 } 1693 return new LinkedList<>(); 1694 } 1695 1696 /* 1697 * Loops over regions of this table, and outputs information about the state. 1698 */ 1699 private static List<Future<Void>> sniff(final Admin admin, final Sink sink, 1700 TableDescriptor tableDesc, ExecutorService executor, TaskType taskType, boolean rawScanEnabled, 1701 LongAdder rwLatency, boolean readAllCF) throws Exception { 1702 LOG.debug("Reading list of regions for table {}", tableDesc.getTableName()); 1703 try (Table table = admin.getConnection().getTable(tableDesc.getTableName())) { 1704 List<RegionTask> tasks = new ArrayList<>(); 1705 try (RegionLocator regionLocator = 1706 admin.getConnection().getRegionLocator(tableDesc.getTableName())) { 1707 for (HRegionLocation location : regionLocator.getAllRegionLocations()) { 1708 if (location == null) { 1709 LOG.warn("Null location"); 1710 continue; 1711 } 1712 ServerName rs = location.getServerName(); 1713 RegionInfo region = location.getRegion(); 1714 tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink) sink, 1715 taskType, rawScanEnabled, rwLatency, readAllCF)); 1716 Map<String, List<RegionTaskResult>> regionMap = ((RegionStdOutSink) sink).getRegionMap(); 1717 regionMap.put(region.getRegionNameAsString(), new ArrayList<RegionTaskResult>()); 1718 } 1719 return executor.invokeAll(tasks); 1720 } 1721 } catch (TableNotFoundException e) { 1722 return Collections.EMPTY_LIST; 1723 } 1724 } 1725 1726 // monitor for zookeeper mode 1727 private static class ZookeeperMonitor extends Monitor { 1728 private List<String> hosts; 1729 private final String znode; 1730 private final int timeout; 1731 1732 protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1733 Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures) { 1734 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, 1735 allowedFailures); 1736 Configuration configuration = connection.getConfiguration(); 1737 znode = configuration.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT); 1738 timeout = 1739 configuration.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); 1740 ConnectStringParser parser = 1741 new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration)); 1742 hosts = Lists.newArrayList(); 1743 for (InetSocketAddress server : parser.getServerAddresses()) { 1744 hosts.add(inetSocketAddress2String(server)); 1745 } 1746 if (allowedFailures > (hosts.size() - 1) / 2) { 1747 LOG.warn( 1748 "Confirm allowable number of failed ZooKeeper nodes, as quorum will " 1749 + "already be lost. Setting of {} failures is unexpected for {} ensemble size.", 1750 allowedFailures, hosts.size()); 1751 } 1752 } 1753 1754 @Override 1755 public void run() { 1756 List<ZookeeperTask> tasks = Lists.newArrayList(); 1757 ZookeeperStdOutSink zkSink = null; 1758 try { 1759 zkSink = this.getSink(); 1760 } catch (RuntimeException e) { 1761 LOG.error("Run ZooKeeperMonitor failed!", e); 1762 this.errorCode = ERROR_EXIT_CODE; 1763 } 1764 this.initialized = true; 1765 for (final String host : hosts) { 1766 tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink)); 1767 } 1768 try { 1769 for (Future<Void> future : this.executor.invokeAll(tasks)) { 1770 try { 1771 future.get(); 1772 } catch (ExecutionException e) { 1773 LOG.error("Sniff zookeeper failed!", e); 1774 this.errorCode = ERROR_EXIT_CODE; 1775 } 1776 } 1777 } catch (InterruptedException e) { 1778 this.errorCode = ERROR_EXIT_CODE; 1779 Thread.currentThread().interrupt(); 1780 LOG.error("Sniff zookeeper interrupted!", e); 1781 } 1782 this.done = true; 1783 } 1784 1785 private ZookeeperStdOutSink getSink() { 1786 if (!(sink instanceof ZookeeperStdOutSink)) { 1787 throw new RuntimeException("Can only write to zookeeper sink"); 1788 } 1789 return ((ZookeeperStdOutSink) sink); 1790 } 1791 } 1792 1793 /** 1794 * A monitor for regionserver mode 1795 */ 1796 private static class RegionServerMonitor extends Monitor { 1797 private boolean rawScanEnabled; 1798 private boolean allRegions; 1799 1800 public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1801 Sink sink, ExecutorService executor, boolean allRegions, boolean treatFailureAsError, 1802 long allowedFailures) { 1803 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, 1804 allowedFailures); 1805 Configuration conf = connection.getConfiguration(); 1806 this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false); 1807 this.allRegions = allRegions; 1808 } 1809 1810 private RegionServerStdOutSink getSink() { 1811 if (!(sink instanceof RegionServerStdOutSink)) { 1812 throw new RuntimeException("Can only write to regionserver sink"); 1813 } 1814 return ((RegionServerStdOutSink) sink); 1815 } 1816 1817 @Override 1818 public void run() { 1819 if (this.initAdmin() && this.checkNoTableNames()) { 1820 RegionServerStdOutSink regionServerSink = null; 1821 try { 1822 regionServerSink = this.getSink(); 1823 } catch (RuntimeException e) { 1824 LOG.error("Run RegionServerMonitor failed!", e); 1825 this.errorCode = ERROR_EXIT_CODE; 1826 } 1827 Map<String, List<RegionInfo>> rsAndRMap = this.filterRegionServerByName(); 1828 this.initialized = true; 1829 this.monitorRegionServers(rsAndRMap, regionServerSink); 1830 } 1831 this.done = true; 1832 } 1833 1834 private boolean checkNoTableNames() { 1835 List<String> foundTableNames = new ArrayList<>(); 1836 TableName[] tableNames = null; 1837 LOG.debug("Reading list of tables"); 1838 try { 1839 tableNames = this.admin.listTableNames(); 1840 } catch (IOException e) { 1841 LOG.error("Get listTableNames failed", e); 1842 this.errorCode = INIT_ERROR_EXIT_CODE; 1843 return false; 1844 } 1845 1846 if (this.targets == null || this.targets.length == 0) { 1847 return true; 1848 } 1849 1850 for (String target : this.targets) { 1851 for (TableName tableName : tableNames) { 1852 if (target.equals(tableName.getNameAsString())) { 1853 foundTableNames.add(target); 1854 } 1855 } 1856 } 1857 1858 if (foundTableNames.size() > 0) { 1859 System.err.println("Cannot pass a tablename when using the -regionserver " 1860 + "option, tablenames:" + foundTableNames.toString()); 1861 this.errorCode = USAGE_EXIT_CODE; 1862 } 1863 return foundTableNames.isEmpty(); 1864 } 1865 1866 private void monitorRegionServers(Map<String, List<RegionInfo>> rsAndRMap, 1867 RegionServerStdOutSink regionServerSink) { 1868 List<RegionServerTask> tasks = new ArrayList<>(); 1869 Map<String, AtomicLong> successMap = new HashMap<>(); 1870 for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) { 1871 String serverName = entry.getKey(); 1872 AtomicLong successes = new AtomicLong(0); 1873 successMap.put(serverName, successes); 1874 if (entry.getValue().isEmpty()) { 1875 LOG.error("Regionserver not serving any regions - {}", serverName); 1876 } else if (this.allRegions) { 1877 for (RegionInfo region : entry.getValue()) { 1878 tasks.add(new RegionServerTask(this.connection, serverName, region, regionServerSink, 1879 this.rawScanEnabled, successes)); 1880 } 1881 } else { 1882 // random select a region if flag not set 1883 RegionInfo region = 1884 entry.getValue().get(ThreadLocalRandom.current().nextInt(entry.getValue().size())); 1885 tasks.add(new RegionServerTask(this.connection, serverName, region, regionServerSink, 1886 this.rawScanEnabled, successes)); 1887 } 1888 } 1889 try { 1890 for (Future<Void> future : this.executor.invokeAll(tasks)) { 1891 try { 1892 future.get(); 1893 } catch (ExecutionException e) { 1894 LOG.error("Sniff regionserver failed!", e); 1895 this.errorCode = ERROR_EXIT_CODE; 1896 } 1897 } 1898 if (this.allRegions) { 1899 for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) { 1900 String serverName = entry.getKey(); 1901 LOG.info("Successfully read {} regions out of {} on regionserver {}", 1902 successMap.get(serverName), entry.getValue().size(), serverName); 1903 } 1904 } 1905 } catch (InterruptedException e) { 1906 this.errorCode = ERROR_EXIT_CODE; 1907 LOG.error("Sniff regionserver interrupted!", e); 1908 } 1909 } 1910 1911 private Map<String, List<RegionInfo>> filterRegionServerByName() { 1912 Map<String, List<RegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName(); 1913 regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap); 1914 return regionServerAndRegionsMap; 1915 } 1916 1917 private Map<String, List<RegionInfo>> getAllRegionServerByName() { 1918 Map<String, List<RegionInfo>> rsAndRMap = new HashMap<>(); 1919 try { 1920 LOG.debug("Reading list of tables and locations"); 1921 List<TableDescriptor> tableDescs = this.admin.listTableDescriptors(); 1922 List<RegionInfo> regions = null; 1923 for (TableDescriptor tableDesc : tableDescs) { 1924 try (RegionLocator regionLocator = 1925 this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) { 1926 for (HRegionLocation location : regionLocator.getAllRegionLocations()) { 1927 if (location == null) { 1928 LOG.warn("Null location"); 1929 continue; 1930 } 1931 ServerName rs = location.getServerName(); 1932 String rsName = rs.getHostname(); 1933 RegionInfo r = location.getRegion(); 1934 if (rsAndRMap.containsKey(rsName)) { 1935 regions = rsAndRMap.get(rsName); 1936 } else { 1937 regions = new ArrayList<>(); 1938 rsAndRMap.put(rsName, regions); 1939 } 1940 regions.add(r); 1941 } 1942 } 1943 } 1944 1945 // get any live regionservers not serving any regions 1946 for (ServerName rs : this.admin.getRegionServers()) { 1947 String rsName = rs.getHostname(); 1948 if (!rsAndRMap.containsKey(rsName)) { 1949 rsAndRMap.put(rsName, Collections.<RegionInfo> emptyList()); 1950 } 1951 } 1952 } catch (IOException e) { 1953 LOG.error("Get HTables info failed", e); 1954 this.errorCode = INIT_ERROR_EXIT_CODE; 1955 } 1956 return rsAndRMap; 1957 } 1958 1959 private Map<String, List<RegionInfo>> 1960 doFilterRegionServerByName(Map<String, List<RegionInfo>> fullRsAndRMap) { 1961 1962 Map<String, List<RegionInfo>> filteredRsAndRMap = null; 1963 1964 if (this.targets != null && this.targets.length > 0) { 1965 filteredRsAndRMap = new HashMap<>(); 1966 Pattern pattern = null; 1967 Matcher matcher = null; 1968 boolean regExpFound = false; 1969 for (String rsName : this.targets) { 1970 if (this.useRegExp) { 1971 regExpFound = false; 1972 pattern = Pattern.compile(rsName); 1973 for (Map.Entry<String, List<RegionInfo>> entry : fullRsAndRMap.entrySet()) { 1974 matcher = pattern.matcher(entry.getKey()); 1975 if (matcher.matches()) { 1976 filteredRsAndRMap.put(entry.getKey(), entry.getValue()); 1977 regExpFound = true; 1978 } 1979 } 1980 if (!regExpFound) { 1981 LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName); 1982 } 1983 } else { 1984 if (fullRsAndRMap.containsKey(rsName)) { 1985 filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName)); 1986 } else { 1987 LOG.info("No RegionServerInfo found, regionServerName {}", rsName); 1988 } 1989 } 1990 } 1991 } else { 1992 filteredRsAndRMap = fullRsAndRMap; 1993 } 1994 return filteredRsAndRMap; 1995 } 1996 } 1997 1998 public static void main(String[] args) throws Exception { 1999 final Configuration conf = HBaseConfiguration.create(); 2000 2001 int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM); 2002 LOG.info("Execution thread count={}", numThreads); 2003 2004 int exitCode; 2005 ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads); 2006 try { 2007 exitCode = ToolRunner.run(conf, new CanaryTool(executor), args); 2008 } finally { 2009 executor.shutdown(); 2010 } 2011 System.exit(exitCode); 2012 } 2013}