001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Map;
025import java.util.NavigableMap;
026import java.util.TreeMap;
027import java.util.function.Function;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.client.RegionInfo;
033import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.CommonFSUtils;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037import org.apache.hadoop.hbase.wal.WAL;
038import org.apache.hadoop.hbase.wal.WALEdit;
039import org.apache.hadoop.hbase.wal.WALKeyImpl;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
045
046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
050
051/**
052 * Helper methods to ease Region Server integration with the Write Ahead Log (WAL). Note that
053 * methods in this class specifically should not require access to anything other than the API found
054 * in {@link WAL}. For internal use only.
055 */
056@InterfaceAudience.Private
057public class WALUtil {
058  private static final Logger LOG = LoggerFactory.getLogger(WALUtil.class);
059
060  public static final String WAL_BLOCK_SIZE = "hbase.regionserver.hlog.blocksize";
061
062  private WALUtil() {
063    // Shut down construction of this class.
064  }
065
066  /**
067   * Write the marker that a compaction has succeeded and is about to be committed. This provides
068   * info to the HMaster to allow it to recover the compaction if this regionserver dies in the
069   * middle. It also prevents the compaction from finishing if this regionserver has already lost
070   * its lease on the log.
071   * <p/>
072   * This write is for internal use only. Not for external client consumption.
073   * @param mvcc Used by WAL to get sequence Id for the waledit.
074   */
075  public static WALKeyImpl writeCompactionMarker(WAL wal,
076    NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
077    MultiVersionConcurrencyControl mvcc) throws IOException {
078    WALKeyImpl walKey =
079      writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null);
080    if (LOG.isTraceEnabled()) {
081      LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
082    }
083    return walKey;
084  }
085
086  /**
087   * Write a flush marker indicating a start / abort or a complete of a region flush
088   * <p/>
089   * This write is for internal use only. Not for external client consumption.
090   */
091  public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
092    RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
093    throws IOException {
094    WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri,
095      WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync);
096    if (LOG.isTraceEnabled()) {
097      LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
098    }
099    return walKey;
100  }
101
102  /**
103   * Write a region open marker indicating that the region is opened. This write is for internal use
104   * only. Not for external client consumption.
105   */
106  public static WALKeyImpl writeRegionEventMarker(WAL wal,
107    NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, RegionEventDescriptor r,
108    MultiVersionConcurrencyControl mvcc) throws IOException {
109    WALKeyImpl walKey =
110      writeMarker(wal, replicationScope, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc, null);
111    if (LOG.isTraceEnabled()) {
112      LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
113    }
114    return walKey;
115  }
116
117  /**
118   * Write a log marker that a bulk load has succeeded and is about to be committed. This write is
119   * for internal use only. Not for external client consumption.
120   * @param wal              The log to write into.
121   * @param replicationScope The replication scope of the families in the HRegion
122   * @param hri              A description of the region in the table that we are bulk loading into.
123   * @param desc             A protocol buffers based description of the client's bulk loading
124   *                         request
125   * @return walKey with sequenceid filled out for this bulk load marker
126   * @throws IOException We will throw an IOException if we can not append to the HLog.
127   */
128  public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
129    final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
130    final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
131    throws IOException {
132    WALKeyImpl walKey =
133      writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc, null);
134    if (LOG.isTraceEnabled()) {
135      LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
136    }
137    return walKey;
138  }
139
140  private static WALKeyImpl writeMarker(final WAL wal,
141    NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, WALEdit edit,
142    MultiVersionConcurrencyControl mvcc, Map<String, byte[]> extendedAttributes)
143    throws IOException {
144    // If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
145    return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, mvcc, extendedAttributes,
146      true);
147  }
148
149  /**
150   * A 'full' WAL transaction involves starting an mvcc transaction followed by an append, an
151   * optional sync, and then a call to complete the mvcc transaction. This method does it all. Good
152   * for case of adding a single edit or marker to the WAL.
153   * <p/>
154   * This write is for internal use only. Not for external client consumption.
155   * @return WALKeyImpl that was added to the WAL.
156   */
157  private static WALKeyImpl doFullMarkerAppendTransaction(WAL wal,
158    NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final WALEdit edit,
159    MultiVersionConcurrencyControl mvcc, Map<String, byte[]> extendedAttributes, boolean sync)
160    throws IOException {
161    // TODO: Pass in current time to use?
162    WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
163      EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes);
164    long trx = MultiVersionConcurrencyControl.NONE;
165    try {
166      trx = wal.appendMarker(hri, walKey, edit);
167      if (sync) {
168        wal.sync(trx);
169      }
170      // Call complete only here because these are markers only. They are not for clients to read.
171      mvcc.complete(walKey.getWriteEntry());
172    } catch (IOException ioe) {
173      if (walKey.getWriteEntry() != null) {
174        mvcc.complete(walKey.getWriteEntry());
175      }
176      /**
177       * Here we do not abort the RegionServer for {@link WALSyncTimeoutIOException} as
178       * {@link HRegion#doWALAppend} does,because WAL Marker just records the internal state and
179       * seems it is no need to always abort the RegionServer when {@link WAL#sync} timeout,it is
180       * the internal state transition that determines whether RegionServer is aborted or not.
181       */
182      throw ioe;
183    }
184    return walKey;
185  }
186
187  /**
188   * Blocksize returned here is 2x the default HDFS blocksize unless explicitly set in
189   * Configuration. Works in tandem with hbase.regionserver.logroll.multiplier. See comment in
190   * AbstractFSWAL in Constructor where we set blocksize and logrollsize for why.
191   * @return Blocksize to use writing WALs.
192   */
193  public static long getWALBlockSize(Configuration conf, FileSystem fs, Path dir)
194    throws IOException {
195    return getWALBlockSize(conf, fs, dir, false);
196  }
197
198  /**
199   * Public because of FSHLog. Should be package-private
200   * @param isRecoverEdits the created writer is for recovered edits or WAL. For recovered edits, it
201   *                       is true and for WAL it is false.
202   */
203  public static long getWALBlockSize(Configuration conf, FileSystem fs, Path dir,
204    boolean isRecoverEdits) throws IOException {
205    long defaultBlockSize = CommonFSUtils.getDefaultBlockSize(fs, dir) * 2;
206    if (isRecoverEdits) {
207      return conf.getLong("hbase.regionserver.recoverededits.blocksize", defaultBlockSize);
208    }
209    return conf.getLong(WAL_BLOCK_SIZE, defaultBlockSize);
210  }
211
212  public static void filterCells(WALEdit edit, Function<Cell, Cell> mapper) {
213    ArrayList<Cell> cells = edit.getCells();
214    int size = cells.size();
215    int newSize = 0;
216    for (int i = 0; i < size; i++) {
217      Cell cell = mapper.apply(cells.get(i));
218      if (cell != null) {
219        cells.set(newSize, cell);
220        newSize++;
221      }
222    }
223    for (int i = size - 1; i >= newSize; i--) {
224      cells.remove(i);
225    }
226    if (newSize < size / 2) {
227      cells.trimToSize();
228    }
229  }
230
231  public static void writeReplicationMarkerAndSync(WAL wal, MultiVersionConcurrencyControl mvcc,
232    RegionInfo regionInfo, byte[] rowKey, long timestamp) throws IOException {
233    NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
234    replicationScope.put(WALEdit.METAFAMILY, REPLICATION_SCOPE_GLOBAL);
235    writeMarker(wal, replicationScope, regionInfo,
236      WALEdit.createReplicationMarkerEdit(rowKey, timestamp), mvcc, null);
237  }
238}