001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.BufferedReader; 021import java.io.BufferedWriter; 022import java.io.File; 023import java.io.FileInputStream; 024import java.io.FileNotFoundException; 025import java.io.FileWriter; 026import java.io.FilenameFilter; 027import java.io.IOException; 028import java.io.InputStreamReader; 029import java.io.PrintStream; 030import java.util.ArrayList; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.List; 035import java.util.Map; 036import java.util.Scanner; 037import java.util.Set; 038import java.util.TreeMap; 039import java.util.regex.Matcher; 040import java.util.regex.Pattern; 041import org.apache.commons.io.FileUtils; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.hbase.HBaseTestingUtil; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.zookeeper.ZKUtil; 048import org.apache.hadoop.hdfs.MiniDFSCluster; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * A helper class for process-based mini-cluster tests. Unlike {@link SingleProcessHBaseCluster}, 054 * starts daemons as separate processes, allowing to do real kill testing. 055 */ 056public class ProcessBasedLocalHBaseCluster { 057 058 private final String hbaseHome, workDir; 059 private final Configuration conf; 060 private final int numMasters, numRegionServers, numDataNodes; 061 private final List<Integer> rsPorts, masterPorts; 062 063 private final int zkClientPort; 064 065 private static final int MAX_FILE_SIZE_OVERRIDE = 10 * 1000 * 1000; 066 067 private static final Logger LOG = LoggerFactory.getLogger(ProcessBasedLocalHBaseCluster.class); 068 069 private List<String> daemonPidFiles = Collections.synchronizedList(new ArrayList<String>()); 070 071 private boolean shutdownHookInstalled; 072 073 private String hbaseDaemonScript; 074 075 private MiniDFSCluster dfsCluster; 076 077 private HBaseTestingUtil testUtil; 078 079 private Thread logTailerThread; 080 081 private List<String> logTailDirs = Collections.synchronizedList(new ArrayList<String>()); 082 083 private static enum ServerType { 084 MASTER("master"), 085 RS("regionserver"), 086 ZK("zookeeper"); 087 088 private final String fullName; 089 090 private ServerType(String fullName) { 091 this.fullName = fullName; 092 } 093 } 094 095 /** 096 * Constructor. Modifies the passed configuration. 097 * @param conf the {@link Configuration} to use 098 * @param numDataNodes the number of data nodes 099 * @param numRegionServers the number of region servers 100 */ 101 public ProcessBasedLocalHBaseCluster(Configuration conf, int numDataNodes, int numRegionServers) { 102 this.conf = conf; 103 this.hbaseHome = HBaseHomePath.getHomePath(); 104 this.numMasters = 1; 105 this.numRegionServers = numRegionServers; 106 this.workDir = hbaseHome + "/target/local_cluster"; 107 this.numDataNodes = numDataNodes; 108 109 hbaseDaemonScript = hbaseHome + "/bin/hbase-daemon.sh"; 110 zkClientPort = HBaseTestingUtil.randomFreePort(); 111 112 this.rsPorts = sortedPorts(numRegionServers); 113 this.masterPorts = sortedPorts(numMasters); 114 115 conf.set(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST); 116 conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); 117 } 118 119 /** 120 * Makes this local HBase cluster use a mini-DFS cluster. Must be called before 121 * {@link #startHBase()}. 122 */ 123 public void startMiniDFS() throws Exception { 124 if (testUtil == null) { 125 testUtil = new HBaseTestingUtil(conf); 126 } 127 dfsCluster = testUtil.startMiniDFSCluster(numDataNodes); 128 } 129 130 /** 131 * Generates a list of random port numbers in the sorted order. A sorted order makes sense if we 132 * ever want to refer to these servers by their index in the returned array, e.g. server #0, #1, 133 * etc. 134 */ 135 private static List<Integer> sortedPorts(int n) { 136 List<Integer> ports = new ArrayList<>(n); 137 for (int i = 0; i < n; ++i) { 138 ports.add(HBaseTestingUtil.randomFreePort()); 139 } 140 Collections.sort(ports); 141 return ports; 142 } 143 144 public void startHBase() throws IOException { 145 startDaemonLogTailer(); 146 cleanupOldState(); 147 148 // start ZK 149 LOG.info("Starting ZooKeeper on port " + zkClientPort); 150 startZK(); 151 152 HBaseTestingUtil.waitForHostPort(HConstants.LOCALHOST, zkClientPort); 153 154 for (int masterPort : masterPorts) { 155 startMaster(masterPort); 156 } 157 158 ZKUtil.waitForBaseZNode(conf); 159 160 for (int rsPort : rsPorts) { 161 startRegionServer(rsPort); 162 } 163 164 LOG.info("Waiting for HBase startup by scanning META"); 165 int attemptsLeft = 10; 166 while (attemptsLeft-- > 0) { 167 try { 168 testUtil.getConnection().getTable(TableName.META_TABLE_NAME); 169 } catch (Exception e) { 170 LOG.info("Waiting for HBase to startup. Retries left: " + attemptsLeft, e); 171 Threads.sleep(1000); 172 } 173 } 174 175 LOG.info("Process-based HBase Cluster with " + numRegionServers 176 + " region servers up and running... \n\n"); 177 } 178 179 public void startRegionServer(int port) { 180 startServer(ServerType.RS, port); 181 } 182 183 public void startMaster(int port) { 184 startServer(ServerType.MASTER, port); 185 } 186 187 public void killRegionServer(int port) throws IOException { 188 killServer(ServerType.RS, port); 189 } 190 191 public void killMaster() throws IOException { 192 killServer(ServerType.MASTER, 0); 193 } 194 195 public void startZK() { 196 startServer(ServerType.ZK, 0); 197 } 198 199 private void executeCommand(String command) { 200 executeCommand(command, null); 201 } 202 203 private void executeCommand(String command, Map<String, String> envOverrides) { 204 ensureShutdownHookInstalled(); 205 LOG.debug("Command : " + command); 206 207 try { 208 String[] envp = null; 209 if (envOverrides != null) { 210 Map<String, String> map = new HashMap<>(System.getenv()); 211 map.putAll(envOverrides); 212 envp = new String[map.size()]; 213 int idx = 0; 214 for (Map.Entry<String, String> e : map.entrySet()) { 215 envp[idx++] = e.getKey() + "=" + e.getValue(); 216 } 217 } 218 219 Process p = Runtime.getRuntime().exec(command, envp); 220 221 BufferedReader stdInput = new BufferedReader(new InputStreamReader(p.getInputStream())); 222 BufferedReader stdError = new BufferedReader(new InputStreamReader(p.getErrorStream())); 223 224 // read the output from the command 225 String s = null; 226 while ((s = stdInput.readLine()) != null) { 227 System.out.println(s); 228 } 229 230 // read any errors from the attempted command 231 while ((s = stdError.readLine()) != null) { 232 System.out.println(s); 233 } 234 } catch (IOException e) { 235 LOG.error("Error running: " + command, e); 236 } 237 } 238 239 private void shutdownAllProcesses() { 240 LOG.info("Killing daemons using pid files"); 241 final List<String> pidFiles = new ArrayList<>(daemonPidFiles); 242 for (String pidFile : pidFiles) { 243 int pid = 0; 244 try { 245 pid = readPidFromFile(pidFile); 246 } catch (IOException ex) { 247 LOG.error("Could not read pid from file " + pidFile); 248 } 249 250 if (pid > 0) { 251 LOG.info("Killing pid " + pid + " (" + pidFile + ")"); 252 killProcess(pid); 253 } 254 } 255 } 256 257 private void ensureShutdownHookInstalled() { 258 if (shutdownHookInstalled) { 259 return; 260 } 261 262 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { 263 @Override 264 public void run() { 265 shutdownAllProcesses(); 266 } 267 })); 268 269 shutdownHookInstalled = true; 270 } 271 272 private void cleanupOldState() { 273 executeCommand("rm -rf " + workDir); 274 } 275 276 private void writeStringToFile(String s, String fileName) { 277 try { 278 BufferedWriter out = new BufferedWriter(new FileWriter(fileName)); 279 out.write(s); 280 out.close(); 281 } catch (IOException e) { 282 LOG.error("Error writing to: " + fileName, e); 283 } 284 } 285 286 private String serverWorkingDir(ServerType serverType, int port) { 287 return workDir + "/" + serverType + "-" + port; 288 } 289 290 private int getServerPID(ServerType serverType, int port) throws IOException { 291 String pidFile = pidFilePath(serverType, port); 292 return readPidFromFile(pidFile); 293 } 294 295 private static int readPidFromFile(String pidFile) throws IOException { 296 Scanner scanner = new Scanner(new File(pidFile)); 297 try { 298 return scanner.nextInt(); 299 } finally { 300 scanner.close(); 301 } 302 } 303 304 private String pidFilePath(ServerType serverType, int port) { 305 String dir = serverWorkingDir(serverType, port); 306 String user = System.getenv("USER"); 307 String pidFile = String.format("%s/hbase-%s-%s.pid", dir, user, serverType.fullName); 308 return pidFile; 309 } 310 311 private void killServer(ServerType serverType, int port) throws IOException { 312 int pid = getServerPID(serverType, port); 313 if (pid > 0) { 314 LOG.info("Killing " + serverType + "; pid=" + pid); 315 killProcess(pid); 316 } 317 } 318 319 private void killProcess(int pid) { 320 String cmd = "kill -s KILL " + pid; 321 executeCommand(cmd); 322 } 323 324 private void startServer(ServerType serverType, int rsPort) { 325 // create working directory for this region server. 326 String dir = serverWorkingDir(serverType, rsPort); 327 String confStr = generateConfig(serverType, rsPort, dir); 328 LOG.debug("Creating directory " + dir); 329 new File(dir).mkdirs(); 330 331 writeStringToFile(confStr, dir + "/hbase-site.xml"); 332 333 // Set debug options to an empty string so that hbase-config.sh does not configure them 334 // using default ports. If we want to run remote debugging on process-based local cluster's 335 // daemons, we can automatically choose non-conflicting JDWP and JMX ports for each daemon 336 // and specify them here. 337 writeStringToFile("unset HBASE_MASTER_OPTS\n" + "unset HBASE_REGIONSERVER_OPTS\n" 338 + "unset HBASE_ZOOKEEPER_OPTS\n" + "HBASE_MASTER_DBG_OPTS=' '\n" 339 + "HBASE_REGIONSERVER_DBG_OPTS=' '\n" + "HBASE_ZOOKEEPER_DBG_OPTS=' '\n" 340 + "HBASE_MASTER_JMX_OPTS=' '\n" + "HBASE_REGIONSERVER_JMX_OPTS=' '\n" 341 + "HBASE_ZOOKEEPER_JMX_OPTS=' '\n", dir + "/hbase-env.sh"); 342 343 Map<String, String> envOverrides = new HashMap<>(); 344 envOverrides.put("HBASE_LOG_DIR", dir); 345 envOverrides.put("HBASE_PID_DIR", dir); 346 try { 347 FileUtils.copyFile(new File(hbaseHome, "conf/log4j.properties"), 348 new File(dir, "log4j.properties")); 349 } catch (IOException ex) { 350 LOG.error("Could not install log4j.properties into " + dir); 351 } 352 353 executeCommand(hbaseDaemonScript + " --config " + dir + " start " + serverType.fullName, 354 envOverrides); 355 daemonPidFiles.add(pidFilePath(serverType, rsPort)); 356 logTailDirs.add(dir); 357 } 358 359 private final String generateConfig(ServerType serverType, int rpcPort, String daemonDir) { 360 StringBuilder sb = new StringBuilder(); 361 Map<String, Object> confMap = new TreeMap<>(); 362 confMap.put(HConstants.CLUSTER_DISTRIBUTED, true); 363 364 if (serverType == ServerType.MASTER) { 365 confMap.put(HConstants.MASTER_PORT, rpcPort); 366 367 int masterInfoPort = HBaseTestingUtil.randomFreePort(); 368 reportWebUIPort("master", masterInfoPort); 369 confMap.put(HConstants.MASTER_INFO_PORT, masterInfoPort); 370 } else if (serverType == ServerType.RS) { 371 confMap.put(HConstants.REGIONSERVER_PORT, rpcPort); 372 373 int rsInfoPort = HBaseTestingUtil.randomFreePort(); 374 reportWebUIPort("region server", rsInfoPort); 375 confMap.put(HConstants.REGIONSERVER_INFO_PORT, rsInfoPort); 376 } else { 377 confMap.put(HConstants.ZOOKEEPER_DATA_DIR, daemonDir); 378 } 379 380 confMap.put(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); 381 confMap.put(HConstants.HREGION_MAX_FILESIZE, MAX_FILE_SIZE_OVERRIDE); 382 383 if (dfsCluster != null) { 384 String fsURL = "hdfs://" + HConstants.LOCALHOST + ":" + dfsCluster.getNameNodePort(); 385 confMap.put("fs.defaultFS", fsURL); 386 confMap.put("hbase.rootdir", fsURL + "/hbase_test"); 387 } 388 389 sb.append("<configuration>\n"); 390 for (Map.Entry<String, Object> entry : confMap.entrySet()) { 391 sb.append(" <property>\n"); 392 sb.append(" <name>" + entry.getKey() + "</name>\n"); 393 sb.append(" <value>" + entry.getValue() + "</value>\n"); 394 sb.append(" </property>\n"); 395 } 396 sb.append("</configuration>\n"); 397 return sb.toString(); 398 } 399 400 private static void reportWebUIPort(String daemon, int port) { 401 LOG.info("Local " + daemon + " web UI is at http://" + HConstants.LOCALHOST + ":" + port); 402 } 403 404 public Configuration getConf() { 405 return conf; 406 } 407 408 public void shutdown() { 409 if (dfsCluster != null) { 410 dfsCluster.shutdown(); 411 } 412 shutdownAllProcesses(); 413 } 414 415 private static final Pattern TO_REMOVE_FROM_LOG_LINES_RE = 416 Pattern.compile("org\\.apache\\.hadoop\\.hbase\\."); 417 418 private static final Pattern LOG_PATH_FORMAT_RE = Pattern.compile("^.*/([A-Z]+)-(\\d+)/[^/]+$"); 419 420 private static String processLine(String line) { 421 Matcher m = TO_REMOVE_FROM_LOG_LINES_RE.matcher(line); 422 return m.replaceAll(""); 423 } 424 425 private final class LocalDaemonLogTailer implements Runnable { 426 private final Set<String> tailedFiles = new HashSet<>(); 427 private final List<String> dirList = new ArrayList<>(); 428 private final Object printLock = new Object(); 429 430 private final FilenameFilter LOG_FILES = new FilenameFilter() { 431 @Override 432 public boolean accept(File dir, String name) { 433 return name.endsWith(".out") || name.endsWith(".log"); 434 } 435 }; 436 437 @Override 438 public void run() { 439 try { 440 runInternal(); 441 } catch (IOException ex) { 442 LOG.error(ex.toString(), ex); 443 } 444 } 445 446 private void runInternal() throws IOException { 447 Thread.currentThread().setName(getClass().getSimpleName()); 448 while (true) { 449 scanDirs(); 450 try { 451 Thread.sleep(500); 452 } catch (InterruptedException e) { 453 LOG.error("Log tailer thread interrupted", e); 454 break; 455 } 456 } 457 } 458 459 private void scanDirs() throws FileNotFoundException { 460 dirList.clear(); 461 dirList.addAll(logTailDirs); 462 for (String d : dirList) { 463 for (File f : new File(d).listFiles(LOG_FILES)) { 464 String filePath = f.getAbsolutePath(); 465 if (!tailedFiles.contains(filePath)) { 466 tailedFiles.add(filePath); 467 startTailingFile(filePath); 468 } 469 } 470 } 471 } 472 473 private void startTailingFile(final String filePath) throws FileNotFoundException { 474 final PrintStream dest = filePath.endsWith(".log") ? System.err : System.out; 475 final ServerType serverType; 476 final int serverPort; 477 Matcher m = LOG_PATH_FORMAT_RE.matcher(filePath); 478 if (m.matches()) { 479 serverType = ServerType.valueOf(m.group(1)); 480 serverPort = Integer.valueOf(m.group(2)); 481 } else { 482 LOG.error("Unrecognized log path format: " + filePath); 483 return; 484 } 485 final String logMsgPrefix = 486 "[" + serverType + (serverPort != 0 ? ":" + serverPort : "") + "] "; 487 488 LOG.debug("Tailing " + filePath); 489 Thread t = new Thread(new Runnable() { 490 @Override 491 public void run() { 492 try { 493 FileInputStream fis = new FileInputStream(filePath); 494 BufferedReader br = new BufferedReader(new InputStreamReader(fis)); 495 String line; 496 while (true) { 497 try { 498 Thread.sleep(200); 499 } catch (InterruptedException e) { 500 LOG.error("Tailer for " + filePath + " interrupted"); 501 break; 502 } 503 while ((line = br.readLine()) != null) { 504 line = logMsgPrefix + processLine(line); 505 synchronized (printLock) { 506 if (line.endsWith("\n")) { 507 dest.print(line); 508 } else { 509 dest.println(line); 510 } 511 dest.flush(); 512 } 513 } 514 } 515 } catch (IOException ex) { 516 LOG.error("Failed tailing " + filePath, ex); 517 } 518 } 519 }); 520 t.setDaemon(true); 521 t.setName("Tailer for " + filePath); 522 t.start(); 523 } 524 525 } 526 527 private void startDaemonLogTailer() { 528 logTailerThread = new Thread(new LocalDaemonLogTailer()); 529 logTailerThread.setDaemon(true); 530 logTailerThread.start(); 531 } 532 533}