001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.EOFException; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.util.zip.CRC32; 025import org.apache.hadoop.fs.FSDataInputStream; 026import org.apache.hadoop.fs.FSDataOutputStream; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 034 035import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 036 037/** 038 * A file storage which supports atomic update through two files, i.e, rotating. The implementation 039 * does not require atomic rename. 040 */ 041@InterfaceAudience.Private 042public class RotateFile { 043 044 private static final Logger LOG = LoggerFactory.getLogger(RotateFile.class); 045 046 private final FileSystem fs; 047 048 private final long maxFileSize; 049 050 private final Path[] files = new Path[2]; 051 052 // this is used to make sure that we do not go backwards 053 private long prevTimestamp = -1; 054 055 private int nextFile = -1; 056 057 /** 058 * Constructs a new RotateFile object with the given parameters. 059 * @param fs the file system to use. 060 * @param dir the directory where the files will be created. 061 * @param name the base name for the files. 062 * @param maxFileSize the maximum size of each file. 063 */ 064 public RotateFile(FileSystem fs, Path dir, String name, long maxFileSize) { 065 this.fs = fs; 066 this.maxFileSize = maxFileSize; 067 this.files[0] = new Path(dir, name + "-0"); 068 this.files[1] = new Path(dir, name + "-1"); 069 } 070 071 private HBaseProtos.RotateFileData read(Path path) throws IOException { 072 byte[] data; 073 int expectedChecksum; 074 try (FSDataInputStream in = fs.open(path)) { 075 int length = in.readInt(); 076 if (length <= 0 || length > maxFileSize) { 077 throw new IOException("Invalid file length " + length 078 + ", either less than 0 or greater then max allowed size " + maxFileSize); 079 } 080 data = new byte[length]; 081 in.readFully(data); 082 expectedChecksum = in.readInt(); 083 } 084 CRC32 crc32 = new CRC32(); 085 crc32.update(data); 086 int calculatedChecksum = (int) crc32.getValue(); 087 if (expectedChecksum != calculatedChecksum) { 088 throw new IOException( 089 "Checksum mismatch, expected " + expectedChecksum + ", actual " + calculatedChecksum); 090 } 091 return HBaseProtos.RotateFileData.parseFrom(data); 092 } 093 094 private int select(HBaseProtos.RotateFileData[] datas) { 095 if (datas[0] == null) { 096 return 1; 097 } 098 if (datas[1] == null) { 099 return 0; 100 } 101 return datas[0].getTimestamp() >= datas[1].getTimestamp() ? 0 : 1; 102 } 103 104 /** 105 * Reads the content of the rotate file by selecting the winner file based on the timestamp of the 106 * data inside the files. It reads the content of both files and selects the one with the latest 107 * timestamp as the winner. If a file is incomplete or does not exist, it logs the error and moves 108 * on to the next file. It returns the content of the winner file as a byte array. If none of the 109 * files have valid data, it returns null. 110 * @return a byte array containing the data from the winner file, or null if no valid data is 111 * found. 112 * @throws IOException if an error occurs while reading the files. 113 */ 114 public byte[] read() throws IOException { 115 HBaseProtos.RotateFileData[] datas = new HBaseProtos.RotateFileData[2]; 116 for (int i = 0; i < 2; i++) { 117 try { 118 datas[i] = read(files[i]); 119 } catch (FileNotFoundException e) { 120 LOG.debug("file {} does not exist", files[i], e); 121 } catch (EOFException e) { 122 LOG.debug("file {} is incomplete", files[i], e); 123 } 124 } 125 int winnerIndex = select(datas); 126 nextFile = 1 - winnerIndex; 127 if (datas[winnerIndex] != null) { 128 prevTimestamp = datas[winnerIndex].getTimestamp(); 129 return datas[winnerIndex].getData().toByteArray(); 130 } else { 131 return null; 132 } 133 } 134 135 @RestrictedApi(explanation = "Should only be called in tests", link = "", 136 allowedOnPath = ".*/RotateFile.java|.*/src/test/.*") 137 static void write(FileSystem fs, Path file, long timestamp, byte[] data) throws IOException { 138 HBaseProtos.RotateFileData proto = HBaseProtos.RotateFileData.newBuilder() 139 .setTimestamp(timestamp).setData(ByteString.copyFrom(data)).build(); 140 byte[] protoData = proto.toByteArray(); 141 CRC32 crc32 = new CRC32(); 142 crc32.update(protoData); 143 int checksum = (int) crc32.getValue(); 144 // 4 bytes length, 8 bytes timestamp, 4 bytes checksum at the end 145 try (FSDataOutputStream out = fs.create(file, true)) { 146 out.writeInt(protoData.length); 147 out.write(protoData); 148 out.writeInt(checksum); 149 } 150 } 151 152 /** 153 * Writes the given data to the next file in the rotation, with a timestamp calculated based on 154 * the previous timestamp and the current time to make sure it is greater than the previous 155 * timestamp. The method also deletes the previous file, which is no longer needed. 156 * <p/> 157 * Notice that, for a newly created {@link RotateFile} instance, you need to call {@link #read()} 158 * first to initialize the nextFile index, before calling this method. 159 * @param data the data to be written to the file 160 * @throws IOException if an I/O error occurs while writing the data to the file 161 */ 162 public void write(byte[] data) throws IOException { 163 if (data.length > maxFileSize) { 164 throw new IOException( 165 "Data size " + data.length + " is greater than max allowed size " + maxFileSize); 166 } 167 long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime()); 168 write(fs, files[nextFile], timestamp, data); 169 prevTimestamp = timestamp; 170 nextFile = 1 - nextFile; 171 try { 172 fs.delete(files[nextFile], false); 173 } catch (IOException e) { 174 // we will create new file with overwrite = true, so not a big deal here, only for speed up 175 // loading as we do not need to read this file when loading 176 LOG.debug("Failed to delete old file {}, ignoring the exception", files[nextFile], e); 177 } 178 } 179 180 /** 181 * Deletes the two files used for rotating data. If any of the files cannot be deleted, an 182 * IOException is thrown. 183 * @throws IOException if there is an error deleting either file 184 */ 185 public void delete() throws IOException { 186 Path next = files[nextFile]; 187 // delete next file first, and then the current file, so when failing to delete, we can still 188 // read the correct data 189 if (fs.exists(next) && !fs.delete(next, false)) { 190 throw new IOException("Can not delete " + next); 191 } 192 Path current = files[1 - nextFile]; 193 if (fs.exists(current) && !fs.delete(current, false)) { 194 throw new IOException("Can not delete " + current); 195 } 196 } 197}