001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.stream.Collectors; 023import org.apache.hadoop.hbase.DoNotRetryIOException; 024import org.apache.hadoop.hbase.HBaseIOException; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.client.RegionReplicaUtil; 027import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; 028import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 029import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 030import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.hbase.util.Strings; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 038import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 039 040import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableProcedureStateData; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableState; 043 044@InterfaceAudience.Private 045public class FlushTableProcedure extends AbstractStateMachineTableProcedure<FlushTableState> { 046 private static final Logger LOG = LoggerFactory.getLogger(FlushTableProcedure.class); 047 048 private TableName tableName; 049 050 private List<byte[]> columnFamilies; 051 052 public FlushTableProcedure() { 053 super(); 054 } 055 056 public FlushTableProcedure(MasterProcedureEnv env, TableName tableName) { 057 this(env, tableName, null); 058 } 059 060 public FlushTableProcedure(MasterProcedureEnv env, TableName tableName, 061 List<byte[]> columnFamilies) { 062 super(env); 063 this.tableName = tableName; 064 this.columnFamilies = columnFamilies; 065 } 066 067 @Override 068 protected LockState acquireLock(MasterProcedureEnv env) { 069 // Here we don't acquire table lock because the flush operation and other operations (like 070 // split or merge) are not mutually exclusive. Region will flush memstore when being closed. 071 // It's safe even if we don't have lock. However, currently we are limited by the scheduling 072 // mechanism of the procedure scheduler and have to acquire table shared lock here. See 073 // HBASE-27905 for details. 074 if (env.getProcedureScheduler().waitTableSharedLock(this, getTableName())) { 075 return LockState.LOCK_EVENT_WAIT; 076 } 077 return LockState.LOCK_ACQUIRED; 078 } 079 080 @Override 081 protected void releaseLock(MasterProcedureEnv env) { 082 env.getProcedureScheduler().wakeTableSharedLock(this, getTableName()); 083 } 084 085 @Override 086 protected Flow executeFromState(MasterProcedureEnv env, FlushTableState state) 087 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 088 LOG.info("{} execute state={}", this, state); 089 090 try { 091 switch (state) { 092 case FLUSH_TABLE_PREPARE: 093 preflightChecks(env, true); 094 setNextState(FlushTableState.FLUSH_TABLE_FLUSH_REGIONS); 095 return Flow.HAS_MORE_STATE; 096 case FLUSH_TABLE_FLUSH_REGIONS: 097 addChildProcedure(createFlushRegionProcedures(env)); 098 return Flow.NO_MORE_STATE; 099 default: 100 throw new UnsupportedOperationException("unhandled state=" + state); 101 } 102 } catch (Exception e) { 103 if (e instanceof DoNotRetryIOException) { 104 // for example, TableNotFoundException or TableNotEnabledException 105 setFailure("master-flush-table", e); 106 LOG.warn("Unrecoverable error trying to flush " + getTableName() + " state=" + state, e); 107 } else { 108 LOG.warn("Retriable error trying to flush " + getTableName() + " state=" + state, e); 109 } 110 } 111 return Flow.HAS_MORE_STATE; 112 } 113 114 @Override 115 protected void rollbackState(MasterProcedureEnv env, FlushTableState state) 116 throws IOException, InterruptedException { 117 // nothing to rollback 118 } 119 120 @Override 121 protected FlushTableState getState(int stateId) { 122 return FlushTableState.forNumber(stateId); 123 } 124 125 @Override 126 protected int getStateId(FlushTableState state) { 127 return state.getNumber(); 128 } 129 130 @Override 131 protected FlushTableState getInitialState() { 132 return FlushTableState.FLUSH_TABLE_PREPARE; 133 } 134 135 @Override 136 public TableName getTableName() { 137 return tableName; 138 } 139 140 @Override 141 public TableOperationType getTableOperationType() { 142 return TableOperationType.FLUSH; 143 } 144 145 @Override 146 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 147 super.serializeStateData(serializer); 148 FlushTableProcedureStateData.Builder builder = FlushTableProcedureStateData.newBuilder(); 149 builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); 150 if (columnFamilies != null) { 151 for (byte[] columnFamily : columnFamilies) { 152 if (columnFamily != null && columnFamily.length > 0) { 153 builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); 154 } 155 } 156 } 157 serializer.serialize(builder.build()); 158 } 159 160 @Override 161 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 162 super.deserializeStateData(serializer); 163 FlushTableProcedureStateData data = serializer.deserialize(FlushTableProcedureStateData.class); 164 this.tableName = ProtobufUtil.toTableName(data.getTableName()); 165 if (data.getColumnFamilyCount() > 0) { 166 this.columnFamilies = data.getColumnFamilyList().stream().filter(cf -> !cf.isEmpty()) 167 .map(ByteString::toByteArray).collect(Collectors.toList()); 168 } 169 } 170 171 private FlushRegionProcedure[] createFlushRegionProcedures(MasterProcedureEnv env) { 172 return env.getAssignmentManager().getTableRegions(getTableName(), true).stream() 173 .filter(r -> RegionReplicaUtil.isDefaultReplica(r)) 174 .map(r -> new FlushRegionProcedure(r, columnFamilies)).toArray(FlushRegionProcedure[]::new); 175 } 176 177 @Override 178 public void toStringClassDetails(StringBuilder builder) { 179 builder.append(getClass().getName()).append(", id=").append(getProcId()).append(", table=") 180 .append(tableName); 181 if (columnFamilies != null) { 182 builder.append(", columnFamilies=[") 183 .append(Strings.JOINER 184 .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList()))) 185 .append("]"); 186 } 187 } 188 189 @Override 190 protected void afterReplay(MasterProcedureEnv env) { 191 if ( 192 !env.getMasterConfiguration().getBoolean( 193 MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED, 194 MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT) 195 ) { 196 setFailure("master-flush-table", new HBaseIOException("FlushTableProcedureV2 is DISABLED")); 197 } 198 } 199}