001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import org.apache.hadoop.hbase.HBaseInterfaceAudience;
021import org.apache.hadoop.hbase.client.Mutation;
022import org.apache.hadoop.hbase.wal.WALEdit;
023import org.apache.yetus.audience.InterfaceAudience;
024
025import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
026
027/**
028 * Wraps together the mutations which are applied as a batch to the region and their operation
029 * status and WALEdits.
030 * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#preBatchMutate(
031 *      org.apache.hadoop.hbase.coprocessor.ObserverContext, MiniBatchOperationInProgress)
032 * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#postBatchMutate(
033 *      org.apache.hadoop.hbase.coprocessor.ObserverContext, MiniBatchOperationInProgress)
034 * @param T Pair<Mutation, Integer> pair of Mutations and associated rowlock ids .
035 */
036@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
037public class MiniBatchOperationInProgress<T> {
038  private final T[] operations;
039  private Mutation[][] operationsFromCoprocessors;
040  private final OperationStatus[] retCodeDetails;
041  private final WALEdit[] walEditsFromCoprocessors;
042  private final int firstIndex;
043  private final int lastIndexExclusive;
044
045  private int readyToWriteCount = 0;
046  private int cellCount = 0;
047  private int numOfPuts = 0;
048  private int numOfDeletes = 0;
049  private int numOfIncrements = 0;
050  private int numOfAppends = 0;
051  /**
052   * Here is for HBASE-26993,saving the all the {@link Mutation}s if there is
053   * {@link Durability#SKIP_WAL} in {@link HRegion.BatchOperation#buildWALEdits} for
054   * {@link HRegion#doMiniBatchMutate} to also replicate {@link Mutation} which is
055   * {@link Durability#SKIP_WAL} to region replica.
056   */
057  private WALEdit walEditForReplicateIfExistsSkipWAL = null;
058
059  public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails,
060    WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive,
061    int readyToWriteCount) {
062    Preconditions.checkArgument(readyToWriteCount <= (lastIndexExclusive - firstIndex));
063    this.operations = operations;
064    this.retCodeDetails = retCodeDetails;
065    this.walEditsFromCoprocessors = walEditsFromCoprocessors;
066    this.firstIndex = firstIndex;
067    this.lastIndexExclusive = lastIndexExclusive;
068    this.readyToWriteCount = readyToWriteCount;
069  }
070
071  /** Returns The number of operations(Mutations) involved in this batch. */
072  public int size() {
073    return this.lastIndexExclusive - this.firstIndex;
074  }
075
076  /** Returns The operation(Mutation) at the specified position. */
077  public T getOperation(int index) {
078    return operations[getAbsoluteIndex(index)];
079  }
080
081  /**
082   * Sets the status code for the operation(Mutation) at the specified position. By setting this
083   * status, {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} can make HRegion to skip
084   * Mutations.
085   */
086  public void setOperationStatus(int index, OperationStatus opStatus) {
087    this.retCodeDetails[getAbsoluteIndex(index)] = opStatus;
088  }
089
090  /** Returns Gets the status code for the operation(Mutation) at the specified position. */
091  public OperationStatus getOperationStatus(int index) {
092    return this.retCodeDetails[getAbsoluteIndex(index)];
093  }
094
095  /**
096   * Sets the walEdit for the operation(Mutation) at the specified position.
097   */
098  public void setWalEdit(int index, WALEdit walEdit) {
099    this.walEditsFromCoprocessors[getAbsoluteIndex(index)] = walEdit;
100  }
101
102  /** Returns Gets the walEdit for the operation(Mutation) at the specified position. */
103  public WALEdit getWalEdit(int index) {
104    return this.walEditsFromCoprocessors[getAbsoluteIndex(index)];
105  }
106
107  private int getAbsoluteIndex(int index) {
108    if (index < 0 || this.firstIndex + index >= this.lastIndexExclusive) {
109      throw new ArrayIndexOutOfBoundsException(index);
110    }
111    return this.firstIndex + index;
112  }
113
114  /**
115   * Add more Mutations corresponding to the Mutation at the given index to be committed atomically
116   * in the same batch. These mutations are applied to the WAL and applied to the memstore as well.
117   * The timestamp of the cells in the given Mutations MUST be obtained from the original mutation.
118   * <b>Note:</b> The durability from CP will be replaced by the durability of corresponding
119   * mutation. <b>Note:</b> Currently only supports Put and Delete operations.
120   * @param index         the index that corresponds to the original mutation index in the batch
121   * @param newOperations the Mutations to add
122   */
123  public void addOperationsFromCP(int index, Mutation[] newOperations) {
124    if (this.operationsFromCoprocessors == null) {
125      // lazy allocation to save on object allocation in case this is not used
126      this.operationsFromCoprocessors = new Mutation[operations.length][];
127    }
128    this.operationsFromCoprocessors[getAbsoluteIndex(index)] = newOperations;
129  }
130
131  public Mutation[] getOperationsFromCoprocessors(int index) {
132    return operationsFromCoprocessors == null
133      ? null
134      : operationsFromCoprocessors[getAbsoluteIndex(index)];
135  }
136
137  public int getReadyToWriteCount() {
138    return readyToWriteCount;
139  }
140
141  public int getLastIndexExclusive() {
142    return lastIndexExclusive;
143  }
144
145  public int getCellCount() {
146    return cellCount;
147  }
148
149  public void addCellCount(int cellCount) {
150    this.cellCount += cellCount;
151  }
152
153  public int getNumOfPuts() {
154    return numOfPuts;
155  }
156
157  public void incrementNumOfPuts() {
158    this.numOfPuts += 1;
159  }
160
161  public int getNumOfDeletes() {
162    return numOfDeletes;
163  }
164
165  public void incrementNumOfDeletes() {
166    this.numOfDeletes += 1;
167  }
168
169  public int getNumOfIncrements() {
170    return numOfIncrements;
171  }
172
173  public void incrementNumOfIncrements() {
174    this.numOfIncrements += 1;
175  }
176
177  public int getNumOfAppends() {
178    return numOfAppends;
179  }
180
181  public void incrementNumOfAppends() {
182    this.numOfAppends += 1;
183  }
184
185  public WALEdit getWalEditForReplicateIfExistsSkipWAL() {
186    return walEditForReplicateIfExistsSkipWAL;
187  }
188
189  public void setWalEditForReplicateIfExistsSkipWAL(WALEdit walEditForReplicateSkipWAL) {
190    this.walEditForReplicateIfExistsSkipWAL = walEditForReplicateSkipWAL;
191  }
192}