001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure.flush;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Map;
025import java.util.Set;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.stream.StreamSupport;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseInterfaceAudience;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.client.TableDescriptor;
035import org.apache.hadoop.hbase.errorhandling.ForeignException;
036import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
037import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
038import org.apache.hadoop.hbase.master.MasterServices;
039import org.apache.hadoop.hbase.master.MetricsMaster;
040import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
041import org.apache.hadoop.hbase.procedure.Procedure;
042import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
043import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
044import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
045import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
046import org.apache.hadoop.hbase.security.User;
047import org.apache.hadoop.hbase.security.access.AccessChecker;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.Pair;
050import org.apache.hadoop.hbase.util.Strings;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.apache.zookeeper.KeeperException;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
057
058import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
060
061@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
062public class MasterFlushTableProcedureManager extends MasterProcedureManager {
063
064  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
065
066  public static final String FLUSH_PROCEDURE_ENABLED = "hbase.flush.procedure.enabled";
067
068  public static final boolean FLUSH_PROCEDURE_ENABLED_DEFAULT = true;
069
070  private static final String FLUSH_TIMEOUT_MILLIS_KEY = "hbase.flush.master.timeoutMillis";
071  private static final int FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000;
072  private static final String FLUSH_WAKE_MILLIS_KEY = "hbase.flush.master.wakeMillis";
073  private static final int FLUSH_WAKE_MILLIS_DEFAULT = 500;
074
075  private static final String FLUSH_PROC_POOL_THREADS_KEY = "hbase.flush.procedure.master.threads";
076  private static final int FLUSH_PROC_POOL_THREADS_DEFAULT = 1;
077
078  private static final Logger LOG = LoggerFactory.getLogger(MasterFlushTableProcedureManager.class);
079
080  private MasterServices master;
081  private ProcedureCoordinator coordinator;
082  private Map<TableName, Procedure> procMap = new HashMap<>();
083  private boolean stopped;
084
085  public MasterFlushTableProcedureManager() {
086  }
087
088  @Override
089  public void stop(String why) {
090    LOG.info("stop: " + why);
091    this.stopped = true;
092  }
093
094  @Override
095  public boolean isStopped() {
096    return this.stopped;
097  }
098
099  @Override
100  public void initialize(MasterServices master, MetricsMaster metricsMaster)
101    throws KeeperException, IOException, UnsupportedOperationException {
102    this.master = master;
103
104    // get the configuration for the coordinator
105    Configuration conf = master.getConfiguration();
106    long wakeFrequency = conf.getInt(FLUSH_WAKE_MILLIS_KEY, FLUSH_WAKE_MILLIS_DEFAULT);
107    long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT);
108    int threads = conf.getInt(FLUSH_PROC_POOL_THREADS_KEY, FLUSH_PROC_POOL_THREADS_DEFAULT);
109
110    // setup the procedure coordinator
111    String name = master.getServerName().toString();
112    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, threads);
113    ProcedureCoordinatorRpcs comms =
114      new ZKProcedureCoordinator(master.getZooKeeper(), getProcedureSignature(), name);
115
116    this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
117  }
118
119  @Override
120  public String getProcedureSignature() {
121    return FLUSH_TABLE_PROCEDURE_SIGNATURE;
122  }
123
124  @Override
125  public void execProcedure(ProcedureDescription desc) throws IOException {
126
127    TableName tableName = TableName.valueOf(desc.getInstance());
128
129    // call pre coproc hook
130    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
131    if (cpHost != null) {
132      cpHost.preTableFlush(tableName);
133    }
134
135    // Get the list of region servers that host the online regions for table.
136    // We use the procedure instance name to carry the table name from the client.
137    // It is possible that regions may move after we get the region server list.
138    // Each region server will get its own online regions for the table.
139    // We may still miss regions that need to be flushed.
140    List<Pair<RegionInfo, ServerName>> regionsAndLocations =
141      master.getAssignmentManager().getTableRegionsAndLocations(tableName, false);
142
143    Set<String> regionServers = new HashSet<>(regionsAndLocations.size());
144    for (Pair<RegionInfo, ServerName> region : regionsAndLocations) {
145      if (region != null && region.getFirst() != null && region.getSecond() != null) {
146        RegionInfo hri = region.getFirst();
147        if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue;
148        regionServers.add(region.getSecond().toString());
149      }
150    }
151
152    ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
153
154    HBaseProtos.NameStringPair families = null;
155    for (HBaseProtos.NameStringPair nsp : desc.getConfigurationList()) {
156      if (HConstants.FAMILY_KEY_STR.equals(nsp.getName())) {
157        families = nsp;
158      }
159    }
160
161    byte[] procArgs;
162    if (families != null) {
163      TableDescriptor tableDescriptor = master.getTableDescriptors().get(tableName);
164      List<String> noSuchFamilies =
165        StreamSupport.stream(Strings.SPLITTER.split(families.getValue()).spliterator(), false)
166          .filter(cf -> !tableDescriptor.hasColumnFamily(Bytes.toBytes(cf))).toList();
167      if (!noSuchFamilies.isEmpty()) {
168        throw new NoSuchColumnFamilyException("Column families " + noSuchFamilies
169          + " don't exist in table " + tableName.getNameAsString());
170      }
171      procArgs = families.toByteArray();
172    } else {
173      procArgs = new byte[0];
174    }
175
176    // Kick of the global procedure from the master coordinator to the region servers.
177    // We rely on the existing Distributed Procedure framework to prevent any concurrent
178    // procedure with the same name.
179    Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), procArgs,
180      Lists.newArrayList(regionServers));
181    monitor.rethrowException();
182    if (proc == null) {
183      String msg = "Failed to submit distributed procedure " + desc.getSignature() + " for '"
184        + desc.getInstance() + "'. " + "Another flush procedure is running?";
185      LOG.error(msg);
186      throw new IOException(msg);
187    }
188
189    procMap.put(tableName, proc);
190
191    try {
192      // wait for the procedure to complete. A timer thread is kicked off that should cancel this
193      // if it takes too long.
194      proc.waitForCompleted();
195      LOG.info("Done waiting - exec procedure " + desc.getSignature() + " for '"
196        + desc.getInstance() + "'");
197      LOG.info("Master flush table procedure is successful!");
198    } catch (InterruptedException e) {
199      ForeignException ee =
200        new ForeignException("Interrupted while waiting for flush table procdure to finish", e);
201      monitor.receive(ee);
202      Thread.currentThread().interrupt();
203    } catch (ForeignException e) {
204      ForeignException ee =
205        new ForeignException("Exception while waiting for flush table procdure to finish", e);
206      monitor.receive(ee);
207    }
208    monitor.rethrowException();
209  }
210
211  @Override
212  public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user)
213    throws IOException {
214    // Done by AccessController as part of preTableFlush coprocessor hook (legacy code path).
215    // In future, when we AC is removed for good, that check should be moved here.
216  }
217
218  @Override
219  public synchronized boolean isProcedureDone(ProcedureDescription desc) throws IOException {
220    // Procedure instance name is the table name.
221    TableName tableName = TableName.valueOf(desc.getInstance());
222    Procedure proc = procMap.get(tableName);
223    if (proc == null) {
224      // The procedure has not even been started yet.
225      // The client would request the procedure and call isProcedureDone().
226      // The HBaseAdmin.execProcedure() wraps both request and isProcedureDone().
227      return false;
228    }
229    // We reply on the existing Distributed Procedure framework to give us the status.
230    return proc.isCompleted();
231  }
232
233}