001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.stream.Collectors;
023import org.apache.hadoop.hbase.DoNotRetryIOException;
024import org.apache.hadoop.hbase.HBaseIOException;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.RegionReplicaUtil;
027import org.apache.hadoop.hbase.client.TableDescriptor;
028import org.apache.hadoop.hbase.master.MasterServices;
029import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
030import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
031import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
032import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
033import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.Strings;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
041import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
042
043import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableProcedureStateData;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableState;
046
047@InterfaceAudience.Private
048public class FlushTableProcedure extends AbstractStateMachineTableProcedure<FlushTableState> {
049  private static final Logger LOG = LoggerFactory.getLogger(FlushTableProcedure.class);
050
051  private TableName tableName;
052
053  private List<byte[]> columnFamilies;
054
055  public FlushTableProcedure() {
056    super();
057  }
058
059  public FlushTableProcedure(MasterProcedureEnv env, TableName tableName) {
060    this(env, tableName, null);
061  }
062
063  public FlushTableProcedure(MasterProcedureEnv env, TableName tableName,
064    List<byte[]> columnFamilies) {
065    super(env);
066    this.tableName = tableName;
067    this.columnFamilies = columnFamilies;
068  }
069
070  @Override
071  protected LockState acquireLock(MasterProcedureEnv env) {
072    // Here we don't acquire table lock because the flush operation and other operations (like
073    // split or merge) are not mutually exclusive. Region will flush memstore when being closed.
074    // It's safe even if we don't have lock. However, currently we are limited by the scheduling
075    // mechanism of the procedure scheduler and have to acquire table shared lock here. See
076    // HBASE-27905 for details.
077    if (env.getProcedureScheduler().waitTableSharedLock(this, getTableName())) {
078      return LockState.LOCK_EVENT_WAIT;
079    }
080    return LockState.LOCK_ACQUIRED;
081  }
082
083  @Override
084  protected void releaseLock(MasterProcedureEnv env) {
085    env.getProcedureScheduler().wakeTableSharedLock(this, getTableName());
086  }
087
088  @Override
089  protected Flow executeFromState(MasterProcedureEnv env, FlushTableState state)
090    throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
091    LOG.info("{} execute state={}", this, state);
092
093    try {
094      switch (state) {
095        case FLUSH_TABLE_PREPARE:
096          preflightChecks(env, true);
097          setNextState(FlushTableState.FLUSH_TABLE_FLUSH_REGIONS);
098          return Flow.HAS_MORE_STATE;
099        case FLUSH_TABLE_FLUSH_REGIONS:
100          addChildProcedure(createFlushRegionProcedures(env));
101          return Flow.NO_MORE_STATE;
102        default:
103          throw new UnsupportedOperationException("unhandled state=" + state);
104      }
105    } catch (Exception e) {
106      if (e instanceof DoNotRetryIOException) {
107        // for example, TableNotFoundException or TableNotEnabledException
108        setFailure("master-flush-table", e);
109        LOG.warn("Unrecoverable error trying to flush " + getTableName() + " state=" + state, e);
110      } else {
111        LOG.warn("Retriable error trying to flush " + getTableName() + " state=" + state, e);
112      }
113    }
114    return Flow.HAS_MORE_STATE;
115  }
116
117  @Override
118  protected void preflightChecks(MasterProcedureEnv env, Boolean enabled) throws HBaseIOException {
119    super.preflightChecks(env, enabled);
120    if (columnFamilies == null) {
121      return;
122    }
123    MasterServices master = env.getMasterServices();
124    try {
125      TableDescriptor tableDescriptor = master.getTableDescriptors().get(tableName);
126      List<String> noSuchFamilies = columnFamilies.stream()
127        .filter(cf -> !tableDescriptor.hasColumnFamily(cf)).map(Bytes::toString).toList();
128      if (!noSuchFamilies.isEmpty()) {
129        throw new NoSuchColumnFamilyException("Column families " + noSuchFamilies
130          + " don't exist in table " + tableName.getNameAsString());
131      }
132    } catch (IOException ioe) {
133      if (ioe instanceof HBaseIOException) {
134        throw (HBaseIOException) ioe;
135      }
136      throw new HBaseIOException(ioe);
137    }
138  }
139
140  @Override
141  protected void rollbackState(MasterProcedureEnv env, FlushTableState state)
142    throws IOException, InterruptedException {
143    // nothing to rollback
144  }
145
146  @Override
147  protected FlushTableState getState(int stateId) {
148    return FlushTableState.forNumber(stateId);
149  }
150
151  @Override
152  protected int getStateId(FlushTableState state) {
153    return state.getNumber();
154  }
155
156  @Override
157  protected FlushTableState getInitialState() {
158    return FlushTableState.FLUSH_TABLE_PREPARE;
159  }
160
161  @Override
162  public TableName getTableName() {
163    return tableName;
164  }
165
166  @Override
167  public TableOperationType getTableOperationType() {
168    return TableOperationType.FLUSH;
169  }
170
171  @Override
172  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
173    super.serializeStateData(serializer);
174    FlushTableProcedureStateData.Builder builder = FlushTableProcedureStateData.newBuilder();
175    builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
176    if (columnFamilies != null) {
177      for (byte[] columnFamily : columnFamilies) {
178        if (columnFamily != null && columnFamily.length > 0) {
179          builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily));
180        }
181      }
182    }
183    serializer.serialize(builder.build());
184  }
185
186  @Override
187  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
188    super.deserializeStateData(serializer);
189    FlushTableProcedureStateData data = serializer.deserialize(FlushTableProcedureStateData.class);
190    this.tableName = ProtobufUtil.toTableName(data.getTableName());
191    if (data.getColumnFamilyCount() > 0) {
192      this.columnFamilies = data.getColumnFamilyList().stream().filter(cf -> !cf.isEmpty())
193        .map(ByteString::toByteArray).collect(Collectors.toList());
194    }
195  }
196
197  private FlushRegionProcedure[] createFlushRegionProcedures(MasterProcedureEnv env) {
198    return env.getAssignmentManager().getTableRegions(getTableName(), true).stream()
199      .filter(r -> RegionReplicaUtil.isDefaultReplica(r))
200      .map(r -> new FlushRegionProcedure(r, columnFamilies)).toArray(FlushRegionProcedure[]::new);
201  }
202
203  @Override
204  public void toStringClassDetails(StringBuilder builder) {
205    builder.append(getClass().getName()).append(", id=").append(getProcId()).append(", table=")
206      .append(tableName);
207    if (columnFamilies != null) {
208      builder.append(", columnFamilies=[")
209        .append(Strings.JOINER
210          .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList())))
211        .append("]");
212    }
213  }
214
215  @Override
216  protected void afterReplay(MasterProcedureEnv env) {
217    if (
218      !env.getMasterConfiguration().getBoolean(
219        MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED,
220        MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT)
221    ) {
222      setFailure("master-flush-table", new HBaseIOException("FlushTableProcedureV2 is DISABLED"));
223    }
224  }
225}