001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.IOException; 021import java.io.OutputStream; 022import java.util.concurrent.atomic.AtomicLong; 023import org.apache.hadoop.fs.FSDataOutputStream; 024import org.apache.hadoop.fs.FSDataOutputStreamBuilder; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.fs.StreamCapabilities; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.ExtendedCell; 030import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 031import org.apache.hadoop.hbase.util.AtomicUtils; 032import org.apache.hadoop.hbase.util.CommonFSUtils; 033import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 034import org.apache.hadoop.hbase.wal.FSHLogProvider; 035import org.apache.hadoop.hbase.wal.WAL.Entry; 036import org.apache.hadoop.hdfs.DistributedFileSystem; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; 043 044/** 045 * Writer for protobuf-based WAL. 046 */ 047@InterfaceAudience.Private 048public class ProtobufLogWriter extends AbstractProtobufLogWriter implements FSHLogProvider.Writer { 049 050 private static final Logger LOG = LoggerFactory.getLogger(ProtobufLogWriter.class); 051 052 protected FSDataOutputStream output; 053 054 private final AtomicLong syncedLength = new AtomicLong(0); 055 056 @Override 057 public void append(Entry entry) throws IOException { 058 entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build() 059 .writeDelimitedTo(output); 060 for (Cell cell : entry.getEdit().getCells()) { 061 // cellEncoder must assume little about the stream, since we write PB and cells in turn. 062 cellEncoder.write((ExtendedCell) cell); 063 } 064 length.set(output.getPos()); 065 } 066 067 @Override 068 public void close() throws IOException { 069 if (this.output != null) { 070 if (!trailerWritten) { 071 writeWALTrailer(); 072 } 073 this.output.close(); 074 this.output = null; 075 } 076 } 077 078 @Override 079 public void sync(boolean forceSync) throws IOException { 080 FSDataOutputStream fsdos = this.output; 081 if (fsdos == null) { 082 return; // Presume closed 083 } 084 fsdos.flush(); 085 if (forceSync) { 086 fsdos.hsync(); 087 } else { 088 fsdos.hflush(); 089 } 090 AtomicUtils.updateMax(this.syncedLength, fsdos.getPos()); 091 } 092 093 @Override 094 public long getSyncedLength() { 095 return this.syncedLength.get(); 096 } 097 098 public FSDataOutputStream getStream() { 099 return this.output; 100 } 101 102 @Override 103 protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, 104 short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite) 105 throws IOException, StreamLacksCapabilityException { 106 FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwritable) 107 .bufferSize(bufferSize).replication(replication).blockSize(blockSize); 108 if (builder instanceof DistributedFileSystem.HdfsDataOutputStreamBuilder) { 109 DistributedFileSystem.HdfsDataOutputStreamBuilder dfsBuilder = 110 (DistributedFileSystem.HdfsDataOutputStreamBuilder) builder; 111 dfsBuilder.replicate(); 112 if (noLocalWrite) { 113 dfsBuilder.noLocalWrite(); 114 } 115 this.output = dfsBuilder.build(); 116 } else { 117 this.output = builder.build(); 118 } 119 120 if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) { 121 if (!output.hasCapability(StreamCapabilities.HFLUSH)) { 122 throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH); 123 } 124 if (!output.hasCapability(StreamCapabilities.HSYNC)) { 125 throw new StreamLacksCapabilityException(StreamCapabilities.HSYNC); 126 } 127 } 128 } 129 130 @Override 131 protected void closeOutputIfNecessary() { 132 if (this.output != null) { 133 try { 134 this.output.close(); 135 } catch (IOException e) { 136 LOG.warn("Close output failed", e); 137 } 138 } 139 } 140 141 @Override 142 protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException { 143 output.write(magic); 144 header.writeDelimitedTo(output); 145 return output.getPos(); 146 } 147 148 @Override 149 protected OutputStream getOutputStreamForCellEncoder() { 150 return this.output; 151 } 152 153 @Override 154 protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException { 155 trailer.writeTo(output); 156 output.writeInt(trailer.getSerializedSize()); 157 output.write(magic); 158 return output.getPos(); 159 } 160}