001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import org.apache.hadoop.hbase.HBaseInterfaceAudience; 021import org.apache.hadoop.hbase.client.Mutation; 022import org.apache.hadoop.hbase.wal.WALEdit; 023import org.apache.yetus.audience.InterfaceAudience; 024 025import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 026 027/** 028 * Wraps together the mutations which are applied as a batch to the region and their operation 029 * status and WALEdits. 030 * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#preBatchMutate( 031 * org.apache.hadoop.hbase.coprocessor.ObserverContext, MiniBatchOperationInProgress) 032 * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#postBatchMutate( 033 * org.apache.hadoop.hbase.coprocessor.ObserverContext, MiniBatchOperationInProgress) 034 * @param T Pair<Mutation, Integer> pair of Mutations and associated rowlock ids . 035 */ 036@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) 037public class MiniBatchOperationInProgress<T> { 038 private final T[] operations; 039 private Mutation[][] operationsFromCoprocessors; 040 private final OperationStatus[] retCodeDetails; 041 private final WALEdit[] walEditsFromCoprocessors; 042 private final int firstIndex; 043 private final int lastIndexExclusive; 044 045 private int readyToWriteCount = 0; 046 private int cellCount = 0; 047 private int numOfPuts = 0; 048 private int numOfDeletes = 0; 049 private int numOfIncrements = 0; 050 private int numOfAppends = 0; 051 /** 052 * Here is for HBASE-26993,saving the all the {@link Mutation}s if there is 053 * {@link Durability#SKIP_WAL} in {@link HRegion.BatchOperation#buildWALEdits} for 054 * {@link HRegion#doMiniBatchMutate} to also replicate {@link Mutation} which is 055 * {@link Durability#SKIP_WAL} to region replica. 056 */ 057 private WALEdit walEditForReplicateIfExistsSkipWAL = null; 058 059 public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails, 060 WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive, 061 int readyToWriteCount) { 062 Preconditions.checkArgument(readyToWriteCount <= (lastIndexExclusive - firstIndex)); 063 this.operations = operations; 064 this.retCodeDetails = retCodeDetails; 065 this.walEditsFromCoprocessors = walEditsFromCoprocessors; 066 this.firstIndex = firstIndex; 067 this.lastIndexExclusive = lastIndexExclusive; 068 this.readyToWriteCount = readyToWriteCount; 069 } 070 071 /** Returns The number of operations(Mutations) involved in this batch. */ 072 public int size() { 073 return this.lastIndexExclusive - this.firstIndex; 074 } 075 076 /** Returns The operation(Mutation) at the specified position. */ 077 public T getOperation(int index) { 078 return operations[getAbsoluteIndex(index)]; 079 } 080 081 /** 082 * Sets the status code for the operation(Mutation) at the specified position. By setting this 083 * status, {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} can make HRegion to skip 084 * Mutations. 085 */ 086 public void setOperationStatus(int index, OperationStatus opStatus) { 087 this.retCodeDetails[getAbsoluteIndex(index)] = opStatus; 088 } 089 090 /** Returns Gets the status code for the operation(Mutation) at the specified position. */ 091 public OperationStatus getOperationStatus(int index) { 092 return this.retCodeDetails[getAbsoluteIndex(index)]; 093 } 094 095 /** 096 * Sets the walEdit for the operation(Mutation) at the specified position. 097 */ 098 public void setWalEdit(int index, WALEdit walEdit) { 099 this.walEditsFromCoprocessors[getAbsoluteIndex(index)] = walEdit; 100 } 101 102 /** Returns Gets the walEdit for the operation(Mutation) at the specified position. */ 103 public WALEdit getWalEdit(int index) { 104 return this.walEditsFromCoprocessors[getAbsoluteIndex(index)]; 105 } 106 107 private int getAbsoluteIndex(int index) { 108 if (index < 0 || this.firstIndex + index >= this.lastIndexExclusive) { 109 throw new ArrayIndexOutOfBoundsException(index); 110 } 111 return this.firstIndex + index; 112 } 113 114 /** 115 * Add more Mutations corresponding to the Mutation at the given index to be committed atomically 116 * in the same batch. These mutations are applied to the WAL and applied to the memstore as well. 117 * The timestamp of the cells in the given Mutations MUST be obtained from the original mutation. 118 * <b>Note:</b> The durability from CP will be replaced by the durability of corresponding 119 * mutation. <b>Note:</b> Currently only supports Put and Delete operations. 120 * @param index the index that corresponds to the original mutation index in the batch 121 * @param newOperations the Mutations to add 122 */ 123 public void addOperationsFromCP(int index, Mutation[] newOperations) { 124 if (this.operationsFromCoprocessors == null) { 125 // lazy allocation to save on object allocation in case this is not used 126 this.operationsFromCoprocessors = new Mutation[operations.length][]; 127 } 128 this.operationsFromCoprocessors[getAbsoluteIndex(index)] = newOperations; 129 } 130 131 public Mutation[] getOperationsFromCoprocessors(int index) { 132 return operationsFromCoprocessors == null 133 ? null 134 : operationsFromCoprocessors[getAbsoluteIndex(index)]; 135 } 136 137 public int getReadyToWriteCount() { 138 return readyToWriteCount; 139 } 140 141 public int getLastIndexExclusive() { 142 return lastIndexExclusive; 143 } 144 145 public int getCellCount() { 146 return cellCount; 147 } 148 149 public void addCellCount(int cellCount) { 150 this.cellCount += cellCount; 151 } 152 153 public int getNumOfPuts() { 154 return numOfPuts; 155 } 156 157 public void incrementNumOfPuts() { 158 this.numOfPuts += 1; 159 } 160 161 public int getNumOfDeletes() { 162 return numOfDeletes; 163 } 164 165 public void incrementNumOfDeletes() { 166 this.numOfDeletes += 1; 167 } 168 169 public int getNumOfIncrements() { 170 return numOfIncrements; 171 } 172 173 public void incrementNumOfIncrements() { 174 this.numOfIncrements += 1; 175 } 176 177 public int getNumOfAppends() { 178 return numOfAppends; 179 } 180 181 public void incrementNumOfAppends() { 182 this.numOfAppends += 1; 183 } 184 185 public WALEdit getWalEditForReplicateIfExistsSkipWAL() { 186 return walEditForReplicateIfExistsSkipWAL; 187 } 188 189 public void setWalEditForReplicateIfExistsSkipWAL(WALEdit walEditForReplicateSkipWAL) { 190 this.walEditForReplicateIfExistsSkipWAL = walEditForReplicateSkipWAL; 191 } 192}