001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.concurrent.LinkedBlockingQueue; 022import org.apache.hadoop.hbase.PleaseHoldException; 023import org.apache.hadoop.hbase.client.ConnectionUtils; 024import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 025import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 026import org.apache.hadoop.hbase.util.Threads; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 032 033import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; 035 036/** 037 * A thread which calls {@code reportProcedureDone} to tell master the result of a remote procedure. 038 */ 039@InterfaceAudience.Private 040class RemoteProcedureResultReporter extends Thread { 041 042 private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureResultReporter.class); 043 044 private static final int MAX_BATCH = 100; 045 046 private final HRegionServer server; 047 048 private final LinkedBlockingQueue<RemoteProcedureResult> results = new LinkedBlockingQueue<>(); 049 050 public RemoteProcedureResultReporter(HRegionServer server) { 051 this.server = server; 052 } 053 054 public void complete(long procId, Throwable error) { 055 RemoteProcedureResult.Builder builder = RemoteProcedureResult.newBuilder().setProcId(procId); 056 if (error != null) { 057 LOG.debug("Failed to complete execution of pid={}", procId, error); 058 builder.setStatus(RemoteProcedureResult.Status.ERROR).setError( 059 ForeignExceptionUtil.toProtoForeignException(server.getServerName().toString(), error)); 060 } else { 061 LOG.debug("Successfully complete execution of pid={}", procId); 062 builder.setStatus(RemoteProcedureResult.Status.SUCCESS); 063 } 064 results.add(builder.build()); 065 } 066 067 @Override 068 public void run() { 069 ReportProcedureDoneRequest.Builder builder = ReportProcedureDoneRequest.newBuilder(); 070 int tries = 0; 071 while (!server.isStopped()) { 072 if (builder.getResultCount() == 0) { 073 try { 074 builder.addResult(results.take()); 075 } catch (InterruptedException e) { 076 Thread.currentThread().interrupt(); 077 continue; 078 } 079 } 080 while (builder.getResultCount() < MAX_BATCH) { 081 RemoteProcedureResult result = results.poll(); 082 if (result == null) { 083 break; 084 } 085 builder.addResult(result); 086 } 087 ReportProcedureDoneRequest request = builder.build(); 088 try { 089 server.reportProcedureDone(builder.build()); 090 builder.clear(); 091 tries = 0; 092 } catch (IOException e) { 093 boolean pause = 094 e instanceof ServerNotRunningYetException || e instanceof PleaseHoldException; 095 long pauseTime; 096 if (pause) { 097 // Do backoff else we flood the Master with requests. 098 pauseTime = ConnectionUtils.getPauseTime(server.getRetryPauseTime(), tries); 099 } else { 100 pauseTime = server.getRetryPauseTime(); // Reset. 101 } 102 LOG.info("Failed procedure report " + TextFormat.shortDebugString(request) + "; retry (#" 103 + tries + ")" 104 + (pause 105 ? " after " + pauseTime + "ms delay (Master is coming online...)." 106 : " immediately."), 107 e); 108 Threads.sleep(pauseTime); 109 tries++; 110 } 111 } 112 } 113}