001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coordination; 019 020import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK; 021import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE; 022import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED; 023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE; 024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS; 025import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS; 026import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 027 028import java.io.IOException; 029import java.util.List; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.hadoop.hbase.SplitLogCounters; 034import org.apache.hadoop.hbase.SplitLogTask; 035import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status; 036import org.apache.hadoop.hbase.exceptions.DeserializationException; 037import org.apache.hadoop.hbase.log.HBaseMarkers; 038import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective; 039import org.apache.hadoop.hbase.master.SplitLogManager.Task; 040import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.hadoop.hbase.wal.WALSplitUtil; 043import org.apache.hadoop.hbase.zookeeper.ZKListener; 044import org.apache.hadoop.hbase.zookeeper.ZKMetadata; 045import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 046import org.apache.hadoop.hbase.zookeeper.ZKUtil; 047import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 048import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 049import org.apache.hadoop.util.StringUtils; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.apache.zookeeper.AsyncCallback; 052import org.apache.zookeeper.CreateMode; 053import org.apache.zookeeper.KeeperException; 054import org.apache.zookeeper.KeeperException.NoNodeException; 055import org.apache.zookeeper.ZooDefs.Ids; 056import org.apache.zookeeper.data.Stat; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060/** 061 * ZooKeeper based implementation of {@link SplitLogManagerCoordination} 062 */ 063@InterfaceAudience.Private 064public class ZKSplitLogManagerCoordination extends ZKListener 065 implements SplitLogManagerCoordination { 066 067 public static final int DEFAULT_TIMEOUT = 120000; 068 public static final int DEFAULT_ZK_RETRIES = 3; 069 public static final int DEFAULT_MAX_RESUBMIT = 3; 070 071 private static final Logger LOG = LoggerFactory.getLogger(SplitLogManagerCoordination.class); 072 073 private final TaskFinisher taskFinisher; 074 private final Configuration conf; 075 076 private long zkretries; 077 private long resubmitThreshold; 078 private long timeout; 079 080 SplitLogManagerDetails details; 081 082 public boolean ignoreZKDeleteForTesting = false; 083 084 public ZKSplitLogManagerCoordination(Configuration conf, ZKWatcher watcher) { 085 super(watcher); 086 this.conf = conf; 087 taskFinisher = new TaskFinisher() { 088 @Override 089 public Status finish(ServerName workerName, String logfile) { 090 try { 091 WALSplitUtil.finishSplitLogFile(logfile, conf); 092 } catch (IOException e) { 093 LOG.warn("Could not finish splitting of log file " + logfile, e); 094 return Status.ERR; 095 } 096 return Status.DONE; 097 } 098 }; 099 } 100 101 @Override 102 public void init() throws IOException { 103 this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES); 104 this.resubmitThreshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT); 105 this.timeout = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, DEFAULT_TIMEOUT); 106 if (this.watcher != null) { 107 this.watcher.registerListener(this); 108 lookForOrphans(); 109 } 110 } 111 112 @Override 113 public String prepareTask(String taskname) { 114 return ZKSplitLog.getEncodedNodeName(watcher, taskname); 115 } 116 117 @Override 118 public int remainingTasksInCoordination() { 119 int count = 0; 120 try { 121 List<String> tasks = 122 ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().splitLogZNode); 123 if (tasks != null) { 124 int listSize = tasks.size(); 125 for (int i = 0; i < listSize; i++) { 126 if (!ZKSplitLog.isRescanNode(tasks.get(i))) { 127 count++; 128 } 129 } 130 } 131 } catch (KeeperException ke) { 132 LOG.warn("Failed to check remaining tasks", ke); 133 count = -1; 134 } 135 return count; 136 } 137 138 /** 139 * It is possible for a task to stay in UNASSIGNED state indefinitely - say SplitLogManager wants 140 * to resubmit a task. It forces the task to UNASSIGNED state but it dies before it could create 141 * the RESCAN task node to signal the SplitLogWorkers to pick up the task. To prevent this 142 * scenario the SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup. 143 */ 144 private void handleUnassignedTask(String path) { 145 if (ZKSplitLog.isRescanNode(watcher, path)) { 146 return; 147 } 148 Task task = findOrCreateOrphanTask(path); 149 if (task.isOrphan() && (task.incarnation.get() == 0)) { 150 LOG.info("Resubmitting unassigned orphan task " + path); 151 // ignore failure to resubmit. The timeout-monitor will handle it later 152 // albeit in a more crude fashion 153 resubmitTask(path, task, FORCE); 154 } 155 } 156 157 @Override 158 public void deleteTask(String path) { 159 deleteNode(path, zkretries); 160 } 161 162 @Override 163 public boolean resubmitTask(String path, Task task, ResubmitDirective directive) { 164 // its ok if this thread misses the update to task.deleted. It will fail later 165 if (task.status != IN_PROGRESS) { 166 return false; 167 } 168 int version; 169 if (directive != FORCE) { 170 // We're going to resubmit: 171 // 1) immediately if the worker server is now marked as dead 172 // 2) after a configurable timeout if the server is not marked as dead but has still not 173 // finished the task. This allows to continue if the worker cannot actually handle it, 174 // for any reason. 175 final long time = EnvironmentEdgeManager.currentTime() - task.last_update; 176 final boolean alive = details.getMaster().getServerManager() != null 177 ? details.getMaster().getServerManager().isServerOnline(task.cur_worker_name) 178 : true; 179 if (alive && time < timeout) { 180 LOG.trace("Skipping the resubmit of " + task.toString() + " because the server " 181 + task.cur_worker_name + " is not marked as dead, we waited for " + time 182 + " while the timeout is " + timeout); 183 return false; 184 } 185 186 if (task.unforcedResubmits.get() >= resubmitThreshold) { 187 if (!task.resubmitThresholdReached) { 188 task.resubmitThresholdReached = true; 189 SplitLogCounters.tot_mgr_resubmit_threshold_reached.increment(); 190 LOG.info("Skipping resubmissions of task " + path + " because threshold " 191 + resubmitThreshold + " reached"); 192 } 193 return false; 194 } 195 // race with heartbeat() that might be changing last_version 196 version = task.last_version; 197 } else { 198 SplitLogCounters.tot_mgr_resubmit_force.increment(); 199 version = -1; 200 } 201 LOG.info("Resubmitting task " + path); 202 task.incarnation.incrementAndGet(); 203 boolean result = resubmit(path, version); 204 if (!result) { 205 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime()); 206 return false; 207 } 208 // don't count forced resubmits 209 if (directive != FORCE) { 210 task.unforcedResubmits.incrementAndGet(); 211 } 212 task.setUnassigned(); 213 rescan(Long.MAX_VALUE); 214 SplitLogCounters.tot_mgr_resubmit.increment(); 215 return true; 216 } 217 218 @Override 219 public void checkTasks() { 220 rescan(Long.MAX_VALUE); 221 }; 222 223 /** 224 * signal the workers that a task was resubmitted by creating the RESCAN node. 225 */ 226 private void rescan(long retries) { 227 // The RESCAN node will be deleted almost immediately by the 228 // SplitLogManager as soon as it is created because it is being 229 // created in the DONE state. This behavior prevents a buildup 230 // of RESCAN nodes. But there is also a chance that a SplitLogWorker 231 // might miss the watch-trigger that creation of RESCAN node provides. 232 // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks 233 // therefore this behavior is safe. 234 SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName()); 235 this.watcher.getRecoverableZooKeeper().getZooKeeper().create(ZKSplitLog.getRescanNode(watcher), 236 slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, 237 new CreateRescanAsyncCallback(), Long.valueOf(retries)); 238 } 239 240 @Override 241 public void submitTask(String path) { 242 createNode(path, zkretries); 243 } 244 245 @Override 246 public void checkTaskStillAvailable(String path) { 247 // A negative retry count will lead to ignoring all error processing. 248 this.watcher.getRecoverableZooKeeper().getZooKeeper().getData(path, this.watcher, 249 new GetDataAsyncCallback(), Long.valueOf(-1) /* retry count */); 250 SplitLogCounters.tot_mgr_get_data_queued.increment(); 251 } 252 253 private void deleteNode(String path, Long retries) { 254 SplitLogCounters.tot_mgr_node_delete_queued.increment(); 255 // Once a task znode is ready for delete, that is it is in the TASK_DONE 256 // state, then no one should be writing to it anymore. That is no one 257 // will be updating the znode version any more. 258 this.watcher.getRecoverableZooKeeper().getZooKeeper().delete(path, -1, 259 new DeleteAsyncCallback(), retries); 260 } 261 262 private void deleteNodeSuccess(String path) { 263 if (ignoreZKDeleteForTesting) { 264 return; 265 } 266 Task task; 267 task = details.getTasks().remove(path); 268 if (task == null) { 269 if (ZKSplitLog.isRescanNode(watcher, path)) { 270 SplitLogCounters.tot_mgr_rescan_deleted.increment(); 271 } 272 SplitLogCounters.tot_mgr_missing_state_in_delete.increment(); 273 LOG.debug("Deleted task without in memory state " + path); 274 return; 275 } 276 synchronized (task) { 277 task.status = DELETED; 278 task.notify(); 279 } 280 SplitLogCounters.tot_mgr_task_deleted.increment(); 281 } 282 283 private void deleteNodeFailure(String path) { 284 LOG.info("Failed to delete node " + path + " and will retry soon."); 285 return; 286 } 287 288 private void createRescanSuccess(String path) { 289 SplitLogCounters.tot_mgr_rescan.increment(); 290 getDataSetWatch(path, zkretries); 291 } 292 293 private void createRescanFailure() { 294 LOG.error(HBaseMarkers.FATAL, "logic failure, rescan failure must not happen"); 295 } 296 297 /** 298 * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions 299 * @param statusCode integer value of a ZooKeeper exception code 300 * @param action description message about the retried action 301 * @return true when need to abandon retries otherwise false 302 */ 303 private boolean needAbandonRetries(int statusCode, String action) { 304 if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) { 305 LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for " 306 + "action=" + action); 307 return true; 308 } 309 return false; 310 } 311 312 private void createNode(String path, Long retry_count) { 313 SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName()); 314 ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), 315 retry_count); 316 SplitLogCounters.tot_mgr_node_create_queued.increment(); 317 return; 318 } 319 320 private void createNodeSuccess(String path) { 321 LOG.debug("Put up splitlog task at znode " + path); 322 getDataSetWatch(path, zkretries); 323 } 324 325 private void createNodeFailure(String path) { 326 // TODO the Manager should split the log locally instead of giving up 327 LOG.warn("Failed to create task node " + path); 328 setDone(path, FAILURE); 329 } 330 331 private void getDataSetWatch(String path, Long retry_count) { 332 this.watcher.getRecoverableZooKeeper().getZooKeeper().getData(path, this.watcher, 333 new GetDataAsyncCallback(), retry_count); 334 SplitLogCounters.tot_mgr_get_data_queued.increment(); 335 } 336 337 private void getDataSetWatchSuccess(String path, byte[] data, int version) 338 throws DeserializationException { 339 if (data == null) { 340 if (version == Integer.MIN_VALUE) { 341 // assume all done. The task znode suddenly disappeared. 342 setDone(path, SUCCESS); 343 return; 344 } 345 SplitLogCounters.tot_mgr_null_data.increment(); 346 LOG.error(HBaseMarkers.FATAL, "logic error - got null data " + path); 347 setDone(path, FAILURE); 348 return; 349 } 350 data = ZKMetadata.removeMetaData(data); 351 SplitLogTask slt = SplitLogTask.parseFrom(data); 352 if (slt.isUnassigned()) { 353 LOG.debug("Task not yet acquired " + path + ", ver=" + version); 354 handleUnassignedTask(path); 355 } else if (slt.isOwned()) { 356 heartbeat(path, version, slt.getServerName()); 357 } else if (slt.isResigned()) { 358 LOG.info("Task " + path + " entered state=" + slt.toString()); 359 resubmitOrFail(path, FORCE); 360 } else if (slt.isDone()) { 361 LOG.info("Task " + path + " entered state=" + slt.toString()); 362 if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) { 363 if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) { 364 setDone(path, SUCCESS); 365 } else { 366 resubmitOrFail(path, CHECK); 367 } 368 } else { 369 setDone(path, SUCCESS); 370 } 371 } else if (slt.isErr()) { 372 LOG.info("Task " + path + " entered state=" + slt.toString()); 373 resubmitOrFail(path, CHECK); 374 } else { 375 LOG.error(HBaseMarkers.FATAL, 376 "logic error - unexpected zk state for path = " + path + " data = " + slt.toString()); 377 setDone(path, FAILURE); 378 } 379 } 380 381 private void resubmitOrFail(String path, ResubmitDirective directive) { 382 if (resubmitTask(path, findOrCreateOrphanTask(path), directive) == false) { 383 setDone(path, FAILURE); 384 } 385 } 386 387 private void getDataSetWatchFailure(String path) { 388 LOG.warn("Failed to set data watch " + path); 389 setDone(path, FAILURE); 390 } 391 392 private void setDone(String path, TerminationStatus status) { 393 Task task = details.getTasks().get(path); 394 if (task == null) { 395 if (!ZKSplitLog.isRescanNode(watcher, path)) { 396 SplitLogCounters.tot_mgr_unacquired_orphan_done.increment(); 397 LOG.debug("Unacquired orphan task is done " + path); 398 } 399 } else { 400 synchronized (task) { 401 if (task.status == IN_PROGRESS) { 402 if (status == SUCCESS) { 403 SplitLogCounters.tot_mgr_log_split_success.increment(); 404 LOG.info("Done splitting " + path); 405 } else { 406 SplitLogCounters.tot_mgr_log_split_err.increment(); 407 LOG.warn("Error splitting " + path); 408 } 409 task.status = status; 410 if (task.batch != null) { 411 synchronized (task.batch) { 412 if (status == SUCCESS) { 413 task.batch.done++; 414 } else { 415 task.batch.error++; 416 } 417 task.batch.notify(); 418 } 419 } 420 } 421 } 422 } 423 // delete the task node in zk. It's an async 424 // call and no one is blocked waiting for this node to be deleted. All 425 // task names are unique (log.<timestamp>) there is no risk of deleting 426 // a future task. 427 // if a deletion fails, TimeoutMonitor will retry the same deletion later 428 deleteNode(path, zkretries); 429 return; 430 } 431 432 private Task findOrCreateOrphanTask(String path) { 433 return computeIfAbsent(details.getTasks(), path, Task::new, () -> { 434 LOG.info("Creating orphan task " + path); 435 SplitLogCounters.tot_mgr_orphan_task_acquired.increment(); 436 }); 437 } 438 439 private void heartbeat(String path, int new_version, ServerName workerName) { 440 Task task = findOrCreateOrphanTask(path); 441 if (new_version != task.last_version) { 442 if (task.isUnassigned()) { 443 LOG.info("Task " + path + " acquired by " + workerName); 444 } 445 task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName); 446 SplitLogCounters.tot_mgr_heartbeat.increment(); 447 } else { 448 // duplicate heartbeats - heartbeats w/o zk node version 449 // changing - are possible. The timeout thread does 450 // getDataSetWatch() just to check whether a node still 451 // exists or not 452 } 453 return; 454 } 455 456 private void lookForOrphans() { 457 List<String> orphans; 458 try { 459 orphans = 460 ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.getZNodePaths().splitLogZNode); 461 if (orphans == null) { 462 LOG.warn("Could not get children of " + this.watcher.getZNodePaths().splitLogZNode); 463 return; 464 } 465 } catch (KeeperException e) { 466 LOG.warn("Could not get children of " + this.watcher.getZNodePaths().splitLogZNode + " " 467 + StringUtils.stringifyException(e)); 468 return; 469 } 470 int rescan_nodes = 0; 471 int listSize = orphans.size(); 472 for (int i = 0; i < listSize; i++) { 473 String path = orphans.get(i); 474 String nodepath = ZNodePaths.joinZNode(watcher.getZNodePaths().splitLogZNode, path); 475 if (ZKSplitLog.isRescanNode(watcher, nodepath)) { 476 rescan_nodes++; 477 LOG.debug("Found orphan rescan node " + path); 478 } else { 479 LOG.info("Found orphan task " + path); 480 } 481 getDataSetWatch(nodepath, zkretries); 482 } 483 LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " + rescan_nodes 484 + " rescan nodes"); 485 } 486 487 @Override 488 public void nodeDataChanged(String path) { 489 Task task; 490 task = details.getTasks().get(path); 491 if (task != null || ZKSplitLog.isRescanNode(watcher, path)) { 492 if (task != null) { 493 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime()); 494 } 495 getDataSetWatch(path, zkretries); 496 } 497 } 498 499 private boolean resubmit(String path, int version) { 500 try { 501 // blocking zk call but this is done from the timeout thread 502 SplitLogTask slt = new SplitLogTask.Unassigned(this.details.getServerName()); 503 if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) { 504 LOG.debug("Failed to resubmit task " + path + " version changed"); 505 return false; 506 } 507 } catch (NoNodeException e) { 508 LOG.warn("Failed to resubmit because znode doesn't exist " + path 509 + " task done (or forced done by removing the znode)"); 510 try { 511 getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); 512 } catch (DeserializationException e1) { 513 LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1); 514 return false; 515 } 516 return false; 517 } catch (KeeperException.BadVersionException e) { 518 LOG.debug("Failed to resubmit task " + path + " version changed"); 519 return false; 520 } catch (KeeperException e) { 521 SplitLogCounters.tot_mgr_resubmit_failed.increment(); 522 LOG.warn("Failed to resubmit " + path, e); 523 return false; 524 } 525 return true; 526 } 527 528 /** 529 * {@link org.apache.hadoop.hbase.master.SplitLogManager} can use objects implementing this 530 * interface to finish off a partially done task by 531 * {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}. This provides a serialization 532 * point at the end of the task processing. Must be restartable and idempotent. 533 */ 534 public interface TaskFinisher { 535 /** 536 * status that can be returned finish() 537 */ 538 enum Status { 539 /** 540 * task completed successfully 541 */ 542 DONE(), 543 /** 544 * task completed with error 545 */ 546 ERR(); 547 } 548 549 /** 550 * finish the partially done task. workername provides clue to where the partial results of the 551 * partially done tasks are present. taskname is the name of the task that was put up in 552 * zookeeper. 553 * <p> 554 * @return DONE if task completed successfully, ERR otherwise 555 */ 556 Status finish(ServerName workerName, String taskname); 557 } 558 559 /** 560 * Asynchronous handler for zk create node results. Retries on failures. 561 */ 562 public class CreateAsyncCallback implements AsyncCallback.StringCallback { 563 private final Logger LOG = LoggerFactory.getLogger(CreateAsyncCallback.class); 564 565 @Override 566 public void processResult(int rc, String path, Object ctx, String name) { 567 SplitLogCounters.tot_mgr_node_create_result.increment(); 568 if (rc != 0) { 569 if (needAbandonRetries(rc, "Create znode " + path)) { 570 createNodeFailure(path); 571 return; 572 } 573 if (rc == KeeperException.Code.NODEEXISTS.intValue()) { 574 // What if there is a delete pending against this pre-existing 575 // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE 576 // state. Only operations that will be carried out on this node by 577 // this manager are get-znode-data, task-finisher and delete-znode. 578 // And all code pieces correctly handle the case of suddenly 579 // disappearing task-znode. 580 LOG.debug("Found pre-existing znode " + path); 581 SplitLogCounters.tot_mgr_node_already_exists.increment(); 582 } else { 583 Long retry_count = (Long) ctx; 584 LOG.warn("Create rc=" + KeeperException.Code.get(rc) + " for " + path 585 + " remaining retries=" + retry_count); 586 if (retry_count == 0) { 587 SplitLogCounters.tot_mgr_node_create_err.increment(); 588 createNodeFailure(path); 589 } else { 590 SplitLogCounters.tot_mgr_node_create_retry.increment(); 591 createNode(path, retry_count - 1); 592 } 593 return; 594 } 595 } 596 createNodeSuccess(path); 597 } 598 } 599 600 /** 601 * Asynchronous handler for zk get-data-set-watch on node results. Retries on failures. 602 */ 603 public class GetDataAsyncCallback implements AsyncCallback.DataCallback { 604 private final Logger LOG = LoggerFactory.getLogger(GetDataAsyncCallback.class); 605 606 @Override 607 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { 608 SplitLogCounters.tot_mgr_get_data_result.increment(); 609 if (rc != 0) { 610 if (needAbandonRetries(rc, "GetData from znode " + path)) { 611 return; 612 } 613 if (rc == KeeperException.Code.NONODE.intValue()) { 614 SplitLogCounters.tot_mgr_get_data_nonode.increment(); 615 LOG.warn("Task znode " + path + " vanished or not created yet."); 616 // ignore since we should not end up in a case where there is in-memory task, 617 // but no znode. The only case is between the time task is created in-memory 618 // and the znode is created. See HBASE-11217. 619 return; 620 } 621 Long retry_count = (Long) ctx; 622 623 if (retry_count < 0) { 624 LOG.warn("Getdata rc=" + KeeperException.Code.get(rc) + " " + path 625 + ". Ignoring error. No error handling. No retrying."); 626 return; 627 } 628 LOG.warn("Getdata rc=" + KeeperException.Code.get(rc) + " " + path + " remaining retries=" 629 + retry_count); 630 if (retry_count == 0) { 631 SplitLogCounters.tot_mgr_get_data_err.increment(); 632 getDataSetWatchFailure(path); 633 } else { 634 SplitLogCounters.tot_mgr_get_data_retry.increment(); 635 getDataSetWatch(path, retry_count - 1); 636 } 637 return; 638 } 639 try { 640 getDataSetWatchSuccess(path, data, stat.getVersion()); 641 } catch (DeserializationException e) { 642 LOG.warn("Deserialization problem", e); 643 } 644 return; 645 } 646 } 647 648 /** 649 * Asynchronous handler for zk delete node results. Retries on failures. 650 */ 651 public class DeleteAsyncCallback implements AsyncCallback.VoidCallback { 652 private final Logger LOG = LoggerFactory.getLogger(DeleteAsyncCallback.class); 653 654 @Override 655 public void processResult(int rc, String path, Object ctx) { 656 SplitLogCounters.tot_mgr_node_delete_result.increment(); 657 if (rc != 0) { 658 if (needAbandonRetries(rc, "Delete znode " + path)) { 659 details.getFailedDeletions().add(path); 660 return; 661 } 662 if (rc != KeeperException.Code.NONODE.intValue()) { 663 SplitLogCounters.tot_mgr_node_delete_err.increment(); 664 Long retry_count = (Long) ctx; 665 LOG.warn("Delete rc=" + KeeperException.Code.get(rc) + " for " + path 666 + " remaining retries=" + retry_count); 667 if (retry_count == 0) { 668 LOG.warn("Delete failed " + path); 669 details.getFailedDeletions().add(path); 670 deleteNodeFailure(path); 671 } else { 672 deleteNode(path, retry_count - 1); 673 } 674 return; 675 } else { 676 LOG.info(path + " does not exist. Either was created but deleted behind our" 677 + " back by another pending delete OR was deleted" 678 + " in earlier retry rounds. zkretries = " + ctx); 679 } 680 } else { 681 LOG.debug("Deleted " + path); 682 } 683 deleteNodeSuccess(path); 684 } 685 } 686 687 /** 688 * Asynchronous handler for zk create RESCAN-node results. Retries on failures. 689 * <p> 690 * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal for all the 691 * {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}s to rescan for new tasks. 692 */ 693 public class CreateRescanAsyncCallback implements AsyncCallback.StringCallback { 694 private final Logger LOG = LoggerFactory.getLogger(CreateRescanAsyncCallback.class); 695 696 @Override 697 public void processResult(int rc, String path, Object ctx, String name) { 698 if (rc != 0) { 699 if (needAbandonRetries(rc, "CreateRescan znode " + path)) { 700 return; 701 } 702 Long retry_count = (Long) ctx; 703 LOG.warn("rc=" + KeeperException.Code.get(rc) + " for " + path + " remaining retries=" 704 + retry_count); 705 if (retry_count == 0) { 706 createRescanFailure(); 707 } else { 708 rescan(retry_count - 1); 709 } 710 return; 711 } 712 // path is the original arg, name is the actual name that was created 713 createRescanSuccess(name); 714 } 715 } 716 717 @Override 718 public void setDetails(SplitLogManagerDetails details) { 719 this.details = details; 720 } 721 722 @Override 723 public SplitLogManagerDetails getDetails() { 724 return details; 725 } 726 727 /** 728 * Temporary function that is used by unit tests only 729 */ 730 public void setIgnoreDeleteForTesting(boolean b) { 731 ignoreZKDeleteForTesting = b; 732 } 733}