001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Map; 025import java.util.NavigableMap; 026import java.util.TreeMap; 027import java.util.function.Function; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.client.RegionInfo; 033import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.CommonFSUtils; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037import org.apache.hadoop.hbase.wal.WAL; 038import org.apache.hadoop.hbase.wal.WALEdit; 039import org.apache.hadoop.hbase.wal.WALKeyImpl; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 045 046import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 050 051/** 052 * Helper methods to ease Region Server integration with the Write Ahead Log (WAL). Note that 053 * methods in this class specifically should not require access to anything other than the API found 054 * in {@link WAL}. For internal use only. 055 */ 056@InterfaceAudience.Private 057public class WALUtil { 058 private static final Logger LOG = LoggerFactory.getLogger(WALUtil.class); 059 060 public static final String WAL_BLOCK_SIZE = "hbase.regionserver.hlog.blocksize"; 061 062 private WALUtil() { 063 // Shut down construction of this class. 064 } 065 066 /** 067 * Write the marker that a compaction has succeeded and is about to be committed. This provides 068 * info to the HMaster to allow it to recover the compaction if this regionserver dies in the 069 * middle. It also prevents the compaction from finishing if this regionserver has already lost 070 * its lease on the log. 071 * <p/> 072 * This write is for internal use only. Not for external client consumption. 073 * @param mvcc Used by WAL to get sequence Id for the waledit. 074 */ 075 public static WALKeyImpl writeCompactionMarker(WAL wal, 076 NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c, 077 MultiVersionConcurrencyControl mvcc) throws IOException { 078 WALKeyImpl walKey = 079 writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null); 080 if (LOG.isTraceEnabled()) { 081 LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); 082 } 083 return walKey; 084 } 085 086 /** 087 * Write a flush marker indicating a start / abort or a complete of a region flush 088 * <p/> 089 * This write is for internal use only. Not for external client consumption. 090 */ 091 public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope, 092 RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc) 093 throws IOException { 094 WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri, 095 WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync); 096 if (LOG.isTraceEnabled()) { 097 LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); 098 } 099 return walKey; 100 } 101 102 /** 103 * Write a region open marker indicating that the region is opened. This write is for internal use 104 * only. Not for external client consumption. 105 */ 106 public static WALKeyImpl writeRegionEventMarker(WAL wal, 107 NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, RegionEventDescriptor r, 108 MultiVersionConcurrencyControl mvcc) throws IOException { 109 WALKeyImpl walKey = 110 writeMarker(wal, replicationScope, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc, null); 111 if (LOG.isTraceEnabled()) { 112 LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r)); 113 } 114 return walKey; 115 } 116 117 /** 118 * Write a log marker that a bulk load has succeeded and is about to be committed. This write is 119 * for internal use only. Not for external client consumption. 120 * @param wal The log to write into. 121 * @param replicationScope The replication scope of the families in the HRegion 122 * @param hri A description of the region in the table that we are bulk loading into. 123 * @param desc A protocol buffers based description of the client's bulk loading 124 * request 125 * @return walKey with sequenceid filled out for this bulk load marker 126 * @throws IOException We will throw an IOException if we can not append to the HLog. 127 */ 128 public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal, 129 final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, 130 final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc) 131 throws IOException { 132 WALKeyImpl walKey = 133 writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc, null); 134 if (LOG.isTraceEnabled()) { 135 LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc)); 136 } 137 return walKey; 138 } 139 140 private static WALKeyImpl writeMarker(final WAL wal, 141 NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, WALEdit edit, 142 MultiVersionConcurrencyControl mvcc, Map<String, byte[]> extendedAttributes) 143 throws IOException { 144 // If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT 145 return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, mvcc, extendedAttributes, 146 true); 147 } 148 149 /** 150 * A 'full' WAL transaction involves starting an mvcc transaction followed by an append, an 151 * optional sync, and then a call to complete the mvcc transaction. This method does it all. Good 152 * for case of adding a single edit or marker to the WAL. 153 * <p/> 154 * This write is for internal use only. Not for external client consumption. 155 * @return WALKeyImpl that was added to the WAL. 156 */ 157 private static WALKeyImpl doFullMarkerAppendTransaction(WAL wal, 158 NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final WALEdit edit, 159 MultiVersionConcurrencyControl mvcc, Map<String, byte[]> extendedAttributes, boolean sync) 160 throws IOException { 161 // TODO: Pass in current time to use? 162 WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), 163 EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes); 164 long trx = MultiVersionConcurrencyControl.NONE; 165 try { 166 trx = wal.appendMarker(hri, walKey, edit); 167 if (sync) { 168 wal.sync(trx); 169 } 170 // Call complete only here because these are markers only. They are not for clients to read. 171 mvcc.complete(walKey.getWriteEntry()); 172 } catch (IOException ioe) { 173 if (walKey.getWriteEntry() != null) { 174 mvcc.complete(walKey.getWriteEntry()); 175 } 176 /** 177 * Here we do not abort the RegionServer for {@link WALSyncTimeoutIOException} as 178 * {@link HRegion#doWALAppend} does,because WAL Marker just records the internal state and 179 * seems it is no need to always abort the RegionServer when {@link WAL#sync} timeout,it is 180 * the internal state transition that determines whether RegionServer is aborted or not. 181 */ 182 throw ioe; 183 } 184 return walKey; 185 } 186 187 /** 188 * Blocksize returned here is 2x the default HDFS blocksize unless explicitly set in 189 * Configuration. Works in tandem with hbase.regionserver.logroll.multiplier. See comment in 190 * AbstractFSWAL in Constructor where we set blocksize and logrollsize for why. 191 * @return Blocksize to use writing WALs. 192 */ 193 public static long getWALBlockSize(Configuration conf, FileSystem fs, Path dir) 194 throws IOException { 195 return getWALBlockSize(conf, fs, dir, false); 196 } 197 198 /** 199 * Public because of FSHLog. Should be package-private 200 * @param isRecoverEdits the created writer is for recovered edits or WAL. For recovered edits, it 201 * is true and for WAL it is false. 202 */ 203 public static long getWALBlockSize(Configuration conf, FileSystem fs, Path dir, 204 boolean isRecoverEdits) throws IOException { 205 long defaultBlockSize = CommonFSUtils.getDefaultBlockSize(fs, dir) * 2; 206 if (isRecoverEdits) { 207 return conf.getLong("hbase.regionserver.recoverededits.blocksize", defaultBlockSize); 208 } 209 return conf.getLong(WAL_BLOCK_SIZE, defaultBlockSize); 210 } 211 212 public static void filterCells(WALEdit edit, Function<Cell, Cell> mapper) { 213 ArrayList<Cell> cells = edit.getCells(); 214 int size = cells.size(); 215 int newSize = 0; 216 for (int i = 0; i < size; i++) { 217 Cell cell = mapper.apply(cells.get(i)); 218 if (cell != null) { 219 cells.set(newSize, cell); 220 newSize++; 221 } 222 } 223 for (int i = size - 1; i >= newSize; i--) { 224 cells.remove(i); 225 } 226 if (newSize < size / 2) { 227 cells.trimToSize(); 228 } 229 } 230 231 public static void writeReplicationMarkerAndSync(WAL wal, MultiVersionConcurrencyControl mvcc, 232 RegionInfo regionInfo, byte[] rowKey, long timestamp) throws IOException { 233 NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR); 234 replicationScope.put(WALEdit.METAFAMILY, REPLICATION_SCOPE_GLOBAL); 235 writeMarker(wal, replicationScope, regionInfo, 236 WALEdit.createReplicationMarkerEdit(rowKey, timestamp), mvcc, null); 237 } 238}