001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.storefiletracker; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.EOFException; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collections; 026import java.util.Comparator; 027import java.util.List; 028import java.util.Map; 029import java.util.NavigableMap; 030import java.util.TreeMap; 031import java.util.concurrent.ForkJoinPool; 032import java.util.regex.Pattern; 033import java.util.zip.CRC32; 034import org.apache.hadoop.fs.FSDataInputStream; 035import org.apache.hadoop.fs.FSDataOutputStream; 036import org.apache.hadoop.fs.FileStatus; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.DoNotRetryIOException; 040import org.apache.hadoop.hbase.regionserver.StoreContext; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 047 048import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList; 049 050/** 051 * To fully avoid listing, here we use two files for tracking. When loading, we will try to read 052 * both the two files, if only one exists, we will trust this one, if both exist, we will compare 053 * the timestamp to see which one is newer and trust that one. And we will record in memory that 054 * which one is trusted by us, and when we need to update the store file list, we will write to the 055 * other file. 056 * <p/> 057 * So in this way, we could avoid listing when we want to load the store file list file. 058 * <p/> 059 * To prevent loading partial file, we use the first 4 bytes as file length, and also append a 4 060 * bytes crc32 checksum at the end. This is because the protobuf message parser sometimes can return 061 * without error on partial bytes if you stop at some special points, but the return message will 062 * have incorrect field value. We should try our best to prevent this happens because loading an 063 * incorrect store file list file usually leads to data loss. 064 * <p/> 065 * To prevent failing silently while downgrading, where we may miss some newly introduced fields in 066 * {@link StoreFileList} which are necessary, we introduce a 'version' field in 067 * {@link StoreFileList}. If we find out that we are reading a {@link StoreFileList} with higher 068 * version, we will fail immediately and tell users that you need extra steps while downgrading, to 069 * prevent potential data loss. 070 */ 071@InterfaceAudience.Private 072class StoreFileListFile { 073 074 private static final Logger LOG = LoggerFactory.getLogger(StoreFileListFile.class); 075 076 // the current version for StoreFileList 077 static final long VERSION = 1; 078 079 static final String TRACK_FILE_DIR = ".filelist"; 080 081 static final String TRACK_FILE_PREFIX = "f1"; 082 083 private static final String TRACK_FILE_ROTATE_PREFIX = "f2"; 084 085 static final char TRACK_FILE_SEPARATOR = '.'; 086 087 static final Pattern TRACK_FILE_PATTERN = Pattern.compile("^f(1|2)(\\.\\d+)?$"); 088 089 // 16 MB, which is big enough for a tracker file 090 private static final int MAX_FILE_SIZE = 16 * 1024 * 1024; 091 092 private final StoreContext ctx; 093 094 private final Path trackFileDir; 095 096 private final Path[] trackFiles = new Path[2]; 097 098 // this is used to make sure that we do not go backwards 099 private long prevTimestamp = -1; 100 101 private int nextTrackFile = -1; 102 103 StoreFileListFile(StoreContext ctx) { 104 this.ctx = ctx; 105 trackFileDir = new Path(ctx.getFamilyStoreDirectoryPath(), TRACK_FILE_DIR); 106 } 107 108 static StoreFileList load(FileSystem fs, Path path) throws IOException { 109 byte[] data; 110 int expectedChecksum; 111 try (FSDataInputStream in = fs.open(path)) { 112 int length = in.readInt(); 113 if (length <= 0 || length > MAX_FILE_SIZE) { 114 throw new IOException("Invalid file length " + length 115 + ", either less than 0 or greater then max allowed size " + MAX_FILE_SIZE); 116 } 117 data = new byte[length]; 118 in.readFully(data); 119 expectedChecksum = in.readInt(); 120 } 121 CRC32 crc32 = new CRC32(); 122 crc32.update(data); 123 int calculatedChecksum = (int) crc32.getValue(); 124 if (expectedChecksum != calculatedChecksum) { 125 throw new IOException( 126 "Checksum mismatch, expected " + expectedChecksum + ", actual " + calculatedChecksum); 127 } 128 StoreFileList storeFileList = StoreFileList.parseFrom(data); 129 if (storeFileList.getVersion() > VERSION) { 130 LOG.error( 131 "The loaded store file list is in version {}, which is higher than expected" 132 + " version {}. Stop loading to prevent potential data loss. This usually because your" 133 + " cluster is downgraded from a newer version. You need extra steps before downgrading," 134 + " like switching back to default store file tracker.", 135 storeFileList.getVersion(), VERSION); 136 throw new IOException("Higher store file list version detected, expected " + VERSION 137 + ", got " + storeFileList.getVersion()); 138 } 139 return storeFileList; 140 } 141 142 StoreFileList load(Path path) throws IOException { 143 FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); 144 return load(fs, path); 145 } 146 147 private int select(StoreFileList[] lists) { 148 if (lists[0] == null) { 149 return 1; 150 } 151 if (lists[1] == null) { 152 return 0; 153 } 154 return lists[0].getTimestamp() >= lists[1].getTimestamp() ? 0 : 1; 155 } 156 157 // file sequence id to path 158 private NavigableMap<Long, List<Path>> listFiles() throws IOException { 159 FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); 160 FileStatus[] statuses; 161 try { 162 statuses = fs.listStatus(trackFileDir); 163 } catch (FileNotFoundException e) { 164 LOG.debug("Track file directory {} does not exist", trackFileDir, e); 165 return Collections.emptyNavigableMap(); 166 } 167 if (statuses == null || statuses.length == 0) { 168 return Collections.emptyNavigableMap(); 169 } 170 TreeMap<Long, List<Path>> map = new TreeMap<>(Comparator.reverseOrder()); 171 for (FileStatus status : statuses) { 172 Path file = status.getPath(); 173 if (!status.isFile()) { 174 LOG.warn("Found invalid track file {}, which is not a file", file); 175 continue; 176 } 177 if (!TRACK_FILE_PATTERN.matcher(file.getName()).matches()) { 178 LOG.warn("Found invalid track file {}, skip", file); 179 continue; 180 } 181 List<String> parts = Splitter.on(TRACK_FILE_SEPARATOR).splitToList(file.getName()); 182 // For compatibility, set the timestamp to 0 if it is missing in the file name. 183 long timestamp = parts.size() > 1 ? Long.parseLong(parts.get(1)) : 0L; 184 map.computeIfAbsent(timestamp, k -> new ArrayList<>()).add(file); 185 } 186 return map; 187 } 188 189 private void initializeTrackFiles(long seqId) { 190 trackFiles[0] = new Path(trackFileDir, TRACK_FILE_PREFIX + TRACK_FILE_SEPARATOR + seqId); 191 trackFiles[1] = new Path(trackFileDir, TRACK_FILE_ROTATE_PREFIX + TRACK_FILE_SEPARATOR + seqId); 192 LOG.info("Initialized track files: {}, {}", trackFiles[0], trackFiles[1]); 193 } 194 195 private void cleanUpTrackFiles(long loadedSeqId, 196 NavigableMap<Long, List<Path>> seqId2TrackFiles) { 197 LOG.info("Cleanup track file with sequence id < {}", loadedSeqId); 198 FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); 199 NavigableMap<Long, List<Path>> toDelete = 200 loadedSeqId >= 0 ? seqId2TrackFiles.tailMap(loadedSeqId, false) : seqId2TrackFiles; 201 toDelete.values().stream().flatMap(l -> l.stream()).forEach(file -> { 202 ForkJoinPool.commonPool().execute(() -> { 203 LOG.info("Deleting track file {}", file); 204 try { 205 fs.delete(file, false); 206 } catch (IOException e) { 207 LOG.warn("failed to delete unused track file {}", file, e); 208 } 209 }); 210 }); 211 } 212 213 StoreFileList load(boolean readOnly) throws IOException { 214 NavigableMap<Long, List<Path>> seqId2TrackFiles = listFiles(); 215 long seqId = -1L; 216 StoreFileList[] lists = new StoreFileList[2]; 217 for (Map.Entry<Long, List<Path>> entry : seqId2TrackFiles.entrySet()) { 218 List<Path> files = entry.getValue(); 219 // should not have more than 2 files, if not, it means that the track files are broken, just 220 // throw exception out and fail the region open. 221 if (files.size() > 2) { 222 throw new DoNotRetryIOException("Should only have at most 2 track files for sequence id " 223 + entry.getKey() + ", but got " + files.size() + " files: " + files); 224 } 225 boolean loaded = false; 226 for (int i = 0; i < files.size(); i++) { 227 try { 228 lists[i] = load(files.get(i)); 229 loaded = true; 230 } catch (EOFException e) { 231 // this is normal case, so just log at debug 232 LOG.debug("EOF loading track file {}, ignoring the exception", trackFiles[i], e); 233 } 234 } 235 if (loaded) { 236 seqId = entry.getKey(); 237 break; 238 } 239 } 240 if (readOnly) { 241 return lists[select(lists)]; 242 } 243 244 cleanUpTrackFiles(seqId, seqId2TrackFiles); 245 246 if (seqId < 0) { 247 initializeTrackFiles(System.currentTimeMillis()); 248 nextTrackFile = 0; 249 return null; 250 } 251 252 initializeTrackFiles(Math.max(System.currentTimeMillis(), seqId + 1)); 253 int winnerIndex = select(lists); 254 nextTrackFile = 1 - winnerIndex; 255 prevTimestamp = lists[winnerIndex].getTimestamp(); 256 return lists[winnerIndex]; 257 } 258 259 @RestrictedApi(explanation = "Should only be called in tests", link = "", 260 allowedOnPath = ".*/StoreFileListFile.java|.*/src/test/.*") 261 static void write(FileSystem fs, Path file, StoreFileList storeFileList) throws IOException { 262 byte[] data = storeFileList.toByteArray(); 263 CRC32 crc32 = new CRC32(); 264 crc32.update(data); 265 int checksum = (int) crc32.getValue(); 266 // 4 bytes length at the beginning, plus 4 bytes checksum 267 try (FSDataOutputStream out = fs.create(file, true)) { 268 out.writeInt(data.length); 269 out.write(data); 270 out.writeInt(checksum); 271 } 272 } 273 274 /** 275 * We will set the timestamp and version in this method so just pass the builder in 276 */ 277 void update(StoreFileList.Builder builder) throws IOException { 278 if (nextTrackFile < 0) { 279 // we need to call load first to load the prevTimestamp and also the next file 280 // we are already in the update method, which is not read only, so pass false 281 load(false); 282 } 283 FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); 284 long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime()); 285 write(fs, trackFiles[nextTrackFile], 286 builder.setTimestamp(timestamp).setVersion(VERSION).build()); 287 // record timestamp 288 prevTimestamp = timestamp; 289 // rotate the file 290 nextTrackFile = 1 - nextTrackFile; 291 292 try { 293 fs.delete(trackFiles[nextTrackFile], false); 294 } catch (IOException e) { 295 // we will create new file with overwrite = true, so not a big deal here, only for speed up 296 // loading as we do not need to read this file when loading 297 LOG.debug("Failed to delete old track file {}, ignoring the exception", 298 trackFiles[nextTrackFile], e); 299 } 300 } 301}