001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.storefiletracker;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.EOFException;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.Comparator;
027import java.util.List;
028import java.util.Map;
029import java.util.NavigableMap;
030import java.util.TreeMap;
031import java.util.concurrent.ForkJoinPool;
032import java.util.regex.Pattern;
033import java.util.zip.CRC32;
034import org.apache.hadoop.fs.FSDataInputStream;
035import org.apache.hadoop.fs.FSDataOutputStream;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.DoNotRetryIOException;
040import org.apache.hadoop.hbase.regionserver.StoreContext;
041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
047
048import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;
049
050/**
051 * To fully avoid listing, here we use two files for tracking. When loading, we will try to read
052 * both the two files, if only one exists, we will trust this one, if both exist, we will compare
053 * the timestamp to see which one is newer and trust that one. And we will record in memory that
054 * which one is trusted by us, and when we need to update the store file list, we will write to the
055 * other file.
056 * <p/>
057 * So in this way, we could avoid listing when we want to load the store file list file.
058 * <p/>
059 * To prevent loading partial file, we use the first 4 bytes as file length, and also append a 4
060 * bytes crc32 checksum at the end. This is because the protobuf message parser sometimes can return
061 * without error on partial bytes if you stop at some special points, but the return message will
062 * have incorrect field value. We should try our best to prevent this happens because loading an
063 * incorrect store file list file usually leads to data loss.
064 * <p/>
065 * To prevent failing silently while downgrading, where we may miss some newly introduced fields in
066 * {@link StoreFileList} which are necessary, we introduce a 'version' field in
067 * {@link StoreFileList}. If we find out that we are reading a {@link StoreFileList} with higher
068 * version, we will fail immediately and tell users that you need extra steps while downgrading, to
069 * prevent potential data loss.
070 */
071@InterfaceAudience.Private
072class StoreFileListFile {
073
074  private static final Logger LOG = LoggerFactory.getLogger(StoreFileListFile.class);
075
076  // the current version for StoreFileList
077  static final long VERSION = 1;
078
079  static final String TRACK_FILE_DIR = ".filelist";
080
081  static final String TRACK_FILE_PREFIX = "f1";
082
083  private static final String TRACK_FILE_ROTATE_PREFIX = "f2";
084
085  static final char TRACK_FILE_SEPARATOR = '.';
086
087  static final Pattern TRACK_FILE_PATTERN = Pattern.compile("^f(1|2)(\\.\\d+)?$");
088
089  // 16 MB, which is big enough for a tracker file
090  private static final int MAX_FILE_SIZE = 16 * 1024 * 1024;
091
092  private final StoreContext ctx;
093
094  private final Path trackFileDir;
095
096  private final Path[] trackFiles = new Path[2];
097
098  // this is used to make sure that we do not go backwards
099  private long prevTimestamp = -1;
100
101  private int nextTrackFile = -1;
102
103  StoreFileListFile(StoreContext ctx) {
104    this.ctx = ctx;
105    trackFileDir = new Path(ctx.getFamilyStoreDirectoryPath(), TRACK_FILE_DIR);
106  }
107
108  static StoreFileList load(FileSystem fs, Path path) throws IOException {
109    byte[] data;
110    int expectedChecksum;
111    try (FSDataInputStream in = fs.open(path)) {
112      int length = in.readInt();
113      if (length <= 0 || length > MAX_FILE_SIZE) {
114        throw new IOException("Invalid file length " + length
115          + ", either less than 0 or greater then max allowed size " + MAX_FILE_SIZE);
116      }
117      data = new byte[length];
118      in.readFully(data);
119      expectedChecksum = in.readInt();
120    }
121    CRC32 crc32 = new CRC32();
122    crc32.update(data);
123    int calculatedChecksum = (int) crc32.getValue();
124    if (expectedChecksum != calculatedChecksum) {
125      throw new IOException(
126        "Checksum mismatch, expected " + expectedChecksum + ", actual " + calculatedChecksum);
127    }
128    StoreFileList storeFileList = StoreFileList.parseFrom(data);
129    if (storeFileList.getVersion() > VERSION) {
130      LOG.error(
131        "The loaded store file list is in version {}, which is higher than expected"
132          + " version {}. Stop loading to prevent potential data loss. This usually because your"
133          + " cluster is downgraded from a newer version. You need extra steps before downgrading,"
134          + " like switching back to default store file tracker.",
135        storeFileList.getVersion(), VERSION);
136      throw new IOException("Higher store file list version detected, expected " + VERSION
137        + ", got " + storeFileList.getVersion());
138    }
139    return storeFileList;
140  }
141
142  StoreFileList load(Path path) throws IOException {
143    FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
144    return load(fs, path);
145  }
146
147  private int select(StoreFileList[] lists) {
148    if (lists[0] == null) {
149      return 1;
150    }
151    if (lists[1] == null) {
152      return 0;
153    }
154    return lists[0].getTimestamp() >= lists[1].getTimestamp() ? 0 : 1;
155  }
156
157  // file sequence id to path
158  private NavigableMap<Long, List<Path>> listFiles() throws IOException {
159    FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
160    FileStatus[] statuses;
161    try {
162      statuses = fs.listStatus(trackFileDir);
163    } catch (FileNotFoundException e) {
164      LOG.debug("Track file directory {} does not exist", trackFileDir, e);
165      return Collections.emptyNavigableMap();
166    }
167    if (statuses == null || statuses.length == 0) {
168      return Collections.emptyNavigableMap();
169    }
170    TreeMap<Long, List<Path>> map = new TreeMap<>(Comparator.reverseOrder());
171    for (FileStatus status : statuses) {
172      Path file = status.getPath();
173      if (!status.isFile()) {
174        LOG.warn("Found invalid track file {}, which is not a file", file);
175        continue;
176      }
177      if (!TRACK_FILE_PATTERN.matcher(file.getName()).matches()) {
178        LOG.warn("Found invalid track file {}, skip", file);
179        continue;
180      }
181      List<String> parts = Splitter.on(TRACK_FILE_SEPARATOR).splitToList(file.getName());
182      // For compatibility, set the timestamp to 0 if it is missing in the file name.
183      long timestamp = parts.size() > 1 ? Long.parseLong(parts.get(1)) : 0L;
184      map.computeIfAbsent(timestamp, k -> new ArrayList<>()).add(file);
185    }
186    return map;
187  }
188
189  private void initializeTrackFiles(long seqId) {
190    trackFiles[0] = new Path(trackFileDir, TRACK_FILE_PREFIX + TRACK_FILE_SEPARATOR + seqId);
191    trackFiles[1] = new Path(trackFileDir, TRACK_FILE_ROTATE_PREFIX + TRACK_FILE_SEPARATOR + seqId);
192    LOG.info("Initialized track files: {}, {}", trackFiles[0], trackFiles[1]);
193  }
194
195  private void cleanUpTrackFiles(long loadedSeqId,
196    NavigableMap<Long, List<Path>> seqId2TrackFiles) {
197    LOG.info("Cleanup track file with sequence id < {}", loadedSeqId);
198    FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
199    NavigableMap<Long, List<Path>> toDelete =
200      loadedSeqId >= 0 ? seqId2TrackFiles.tailMap(loadedSeqId, false) : seqId2TrackFiles;
201    toDelete.values().stream().flatMap(l -> l.stream()).forEach(file -> {
202      ForkJoinPool.commonPool().execute(() -> {
203        LOG.info("Deleting track file {}", file);
204        try {
205          fs.delete(file, false);
206        } catch (IOException e) {
207          LOG.warn("failed to delete unused track file {}", file, e);
208        }
209      });
210    });
211  }
212
213  StoreFileList load(boolean readOnly) throws IOException {
214    NavigableMap<Long, List<Path>> seqId2TrackFiles = listFiles();
215    long seqId = -1L;
216    StoreFileList[] lists = new StoreFileList[2];
217    for (Map.Entry<Long, List<Path>> entry : seqId2TrackFiles.entrySet()) {
218      List<Path> files = entry.getValue();
219      // should not have more than 2 files, if not, it means that the track files are broken, just
220      // throw exception out and fail the region open.
221      if (files.size() > 2) {
222        throw new DoNotRetryIOException("Should only have at most 2 track files for sequence id "
223          + entry.getKey() + ", but got " + files.size() + " files: " + files);
224      }
225      boolean loaded = false;
226      for (int i = 0; i < files.size(); i++) {
227        try {
228          lists[i] = load(files.get(i));
229          loaded = true;
230        } catch (EOFException e) {
231          // this is normal case, so just log at debug
232          LOG.debug("EOF loading track file {}, ignoring the exception", trackFiles[i], e);
233        }
234      }
235      if (loaded) {
236        seqId = entry.getKey();
237        break;
238      }
239    }
240    if (readOnly) {
241      return lists[select(lists)];
242    }
243
244    cleanUpTrackFiles(seqId, seqId2TrackFiles);
245
246    if (seqId < 0) {
247      initializeTrackFiles(System.currentTimeMillis());
248      nextTrackFile = 0;
249      return null;
250    }
251
252    initializeTrackFiles(Math.max(System.currentTimeMillis(), seqId + 1));
253    int winnerIndex = select(lists);
254    nextTrackFile = 1 - winnerIndex;
255    prevTimestamp = lists[winnerIndex].getTimestamp();
256    return lists[winnerIndex];
257  }
258
259  @RestrictedApi(explanation = "Should only be called in tests", link = "",
260      allowedOnPath = ".*/StoreFileListFile.java|.*/src/test/.*")
261  static void write(FileSystem fs, Path file, StoreFileList storeFileList) throws IOException {
262    byte[] data = storeFileList.toByteArray();
263    CRC32 crc32 = new CRC32();
264    crc32.update(data);
265    int checksum = (int) crc32.getValue();
266    // 4 bytes length at the beginning, plus 4 bytes checksum
267    try (FSDataOutputStream out = fs.create(file, true)) {
268      out.writeInt(data.length);
269      out.write(data);
270      out.writeInt(checksum);
271    }
272  }
273
274  /**
275   * We will set the timestamp and version in this method so just pass the builder in
276   */
277  void update(StoreFileList.Builder builder) throws IOException {
278    if (nextTrackFile < 0) {
279      // we need to call load first to load the prevTimestamp and also the next file
280      // we are already in the update method, which is not read only, so pass false
281      load(false);
282    }
283    FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
284    long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
285    write(fs, trackFiles[nextTrackFile],
286      builder.setTimestamp(timestamp).setVersion(VERSION).build());
287    // record timestamp
288    prevTimestamp = timestamp;
289    // rotate the file
290    nextTrackFile = 1 - nextTrackFile;
291
292    try {
293      fs.delete(trackFiles[nextTrackFile], false);
294    } catch (IOException e) {
295      // we will create new file with overwrite = true, so not a big deal here, only for speed up
296      // loading as we do not need to read this file when loading
297      LOG.debug("Failed to delete old track file {}, ignoring the exception",
298        trackFiles[nextTrackFile], e);
299    }
300  }
301}