001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import java.io.FileOutputStream;
021import java.io.IOException;
022import java.util.Comparator;
023import java.util.HashMap;
024import java.util.Map;
025import java.util.NavigableSet;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentSkipListSet;
028import java.util.function.Function;
029import org.apache.hadoop.hbase.io.ByteBuffAllocator;
030import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
031import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
032import org.apache.hadoop.hbase.io.hfile.BlockPriority;
033import org.apache.hadoop.hbase.io.hfile.BlockType;
034import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
035import org.apache.hadoop.hbase.io.hfile.HFileBlock;
036import org.apache.hadoop.hbase.util.Pair;
037import org.apache.yetus.audience.InterfaceAudience;
038
039import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
040
041import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
042
043@InterfaceAudience.Private
044final class BucketProtoUtils {
045
046  final static byte[] PB_MAGIC_V2 = new byte[] { 'V', '2', 'U', 'F' };
047
048  private BucketProtoUtils() {
049
050  }
051
052  static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache,
053    BucketCacheProtos.BackingMap.Builder backingMapBuilder) {
054    return BucketCacheProtos.BucketCacheEntry.newBuilder().setCacheCapacity(cache.getMaxSize())
055      .setIoClass(cache.ioEngine.getClass().getName())
056      .setMapClass(cache.backingMap.getClass().getName())
057      .putAllDeserializers(CacheableDeserializerIdManager.save())
058      .putAllCachedFiles(toCachedPB(cache.fullyCachedFiles))
059      .setBackingMap(backingMapBuilder.build())
060      .setChecksum(ByteString
061        .copyFrom(((PersistentIOEngine) cache.ioEngine).calculateChecksum(cache.getAlgorithm())))
062      .build();
063  }
064
065  public static void serializeAsPB(BucketCache cache, FileOutputStream fos, long chunkSize)
066    throws IOException {
067    // Write the new version of magic number.
068    fos.write(PB_MAGIC_V2);
069
070    BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder();
071    BucketCacheProtos.BackingMapEntry.Builder entryBuilder =
072      BucketCacheProtos.BackingMapEntry.newBuilder();
073
074    // Persist the metadata first.
075    toPB(cache, builder).writeDelimitedTo(fos);
076
077    int blockCount = 0;
078    // Persist backing map entries in chunks of size 'chunkSize'.
079    for (Map.Entry<BlockCacheKey, BucketEntry> entry : cache.backingMap.entrySet()) {
080      blockCount++;
081      addEntryToBuilder(entry, entryBuilder, builder);
082      if (blockCount % chunkSize == 0) {
083        builder.build().writeDelimitedTo(fos);
084        builder.clear();
085      }
086    }
087    // Persist the last chunk.
088    if (builder.getEntryList().size() > 0) {
089      builder.build().writeDelimitedTo(fos);
090    }
091  }
092
093  private static void addEntryToBuilder(Map.Entry<BlockCacheKey, BucketEntry> entry,
094    BucketCacheProtos.BackingMapEntry.Builder entryBuilder,
095    BucketCacheProtos.BackingMap.Builder builder) {
096    entryBuilder.clear();
097    entryBuilder.setKey(BucketProtoUtils.toPB(entry.getKey()));
098    entryBuilder.setValue(BucketProtoUtils.toPB(entry.getValue()));
099    builder.addEntry(entryBuilder.build());
100  }
101
102  private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) {
103    return BucketCacheProtos.BlockCacheKey.newBuilder().setHfilename(key.getHfileName())
104      .setOffset(key.getOffset()).setPrimaryReplicaBlock(key.isPrimary())
105      .setBlockType(toPB(key.getBlockType())).build();
106  }
107
108  private static BucketCacheProtos.BlockType toPB(BlockType blockType) {
109    switch (blockType) {
110      case DATA:
111        return BucketCacheProtos.BlockType.data;
112      case META:
113        return BucketCacheProtos.BlockType.meta;
114      case TRAILER:
115        return BucketCacheProtos.BlockType.trailer;
116      case INDEX_V1:
117        return BucketCacheProtos.BlockType.index_v1;
118      case FILE_INFO:
119        return BucketCacheProtos.BlockType.file_info;
120      case LEAF_INDEX:
121        return BucketCacheProtos.BlockType.leaf_index;
122      case ROOT_INDEX:
123        return BucketCacheProtos.BlockType.root_index;
124      case BLOOM_CHUNK:
125        return BucketCacheProtos.BlockType.bloom_chunk;
126      case ENCODED_DATA:
127        return BucketCacheProtos.BlockType.encoded_data;
128      case GENERAL_BLOOM_META:
129        return BucketCacheProtos.BlockType.general_bloom_meta;
130      case INTERMEDIATE_INDEX:
131        return BucketCacheProtos.BlockType.intermediate_index;
132      case DELETE_FAMILY_BLOOM_META:
133        return BucketCacheProtos.BlockType.delete_family_bloom_meta;
134      default:
135        throw new Error("Unrecognized BlockType.");
136    }
137  }
138
139  private static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) {
140    return BucketCacheProtos.BucketEntry.newBuilder().setOffset(entry.offset())
141      .setCachedTime(entry.getCachedTime()).setLength(entry.getLength())
142      .setDiskSizeWithHeader(entry.getOnDiskSizeWithHeader())
143      .setDeserialiserIndex(entry.deserializerIndex).setAccessCounter(entry.getAccessCounter())
144      .setPriority(toPB(entry.getPriority())).build();
145  }
146
147  private static BucketCacheProtos.BlockPriority toPB(BlockPriority p) {
148    switch (p) {
149      case MULTI:
150        return BucketCacheProtos.BlockPriority.multi;
151      case MEMORY:
152        return BucketCacheProtos.BlockPriority.memory;
153      case SINGLE:
154        return BucketCacheProtos.BlockPriority.single;
155      default:
156        throw new Error("Unrecognized BlockPriority.");
157    }
158  }
159
160  static Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> fromPB(
161    Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap,
162    Function<BucketEntry, Recycler> createRecycler) throws IOException {
163    ConcurrentHashMap<BlockCacheKey, BucketEntry> result = new ConcurrentHashMap<>();
164    NavigableSet<BlockCacheKey> resultSet = new ConcurrentSkipListSet<>(Comparator
165      .comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset));
166    for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) {
167      BucketCacheProtos.BlockCacheKey protoKey = entry.getKey();
168      BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(),
169        protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType()));
170      BucketCacheProtos.BucketEntry protoValue = entry.getValue();
171      // TODO:We use ByteBuffAllocator.HEAP here, because we could not get the ByteBuffAllocator
172      // which created by RpcServer elegantly.
173      BucketEntry value = new BucketEntry(protoValue.getOffset(), protoValue.getLength(),
174        protoValue.getDiskSizeWithHeader(), protoValue.getAccessCounter(),
175        protoValue.getCachedTime(),
176        protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory, createRecycler,
177        ByteBuffAllocator.HEAP);
178      // This is the deserializer that we stored
179      int oldIndex = protoValue.getDeserialiserIndex();
180      String deserializerClass = deserializers.get(oldIndex);
181      if (deserializerClass == null) {
182        throw new IOException("Found deserializer index without matching entry.");
183      }
184      // Convert it to the identifier for the deserializer that we have in this runtime
185      if (deserializerClass.equals(HFileBlock.BlockDeserializer.class.getName())) {
186        int actualIndex = HFileBlock.BLOCK_DESERIALIZER.getDeserializerIdentifier();
187        value.deserializerIndex = (byte) actualIndex;
188      } else {
189        // We could make this more plugable, but right now HFileBlock is the only implementation
190        // of Cacheable outside of tests, so this might not ever matter.
191        throw new IOException("Unknown deserializer class found: " + deserializerClass);
192      }
193      result.put(key, value);
194      resultSet.add(key);
195    }
196    return new Pair<>(result, resultSet);
197  }
198
199  private static BlockType fromPb(BucketCacheProtos.BlockType blockType) {
200    switch (blockType) {
201      case data:
202        return BlockType.DATA;
203      case meta:
204        return BlockType.META;
205      case trailer:
206        return BlockType.TRAILER;
207      case index_v1:
208        return BlockType.INDEX_V1;
209      case file_info:
210        return BlockType.FILE_INFO;
211      case leaf_index:
212        return BlockType.LEAF_INDEX;
213      case root_index:
214        return BlockType.ROOT_INDEX;
215      case bloom_chunk:
216        return BlockType.BLOOM_CHUNK;
217      case encoded_data:
218        return BlockType.ENCODED_DATA;
219      case general_bloom_meta:
220        return BlockType.GENERAL_BLOOM_META;
221      case intermediate_index:
222        return BlockType.INTERMEDIATE_INDEX;
223      case delete_family_bloom_meta:
224        return BlockType.DELETE_FAMILY_BLOOM_META;
225      default:
226        throw new Error("Unrecognized BlockType.");
227    }
228  }
229
230  static Map<String, BucketCacheProtos.RegionFileSizeMap>
231    toCachedPB(Map<String, Pair<String, Long>> prefetchedHfileNames) {
232    Map<String, BucketCacheProtos.RegionFileSizeMap> tmpMap = new HashMap<>();
233    prefetchedHfileNames.forEach((hfileName, regionPrefetchMap) -> {
234      BucketCacheProtos.RegionFileSizeMap tmpRegionFileSize =
235        BucketCacheProtos.RegionFileSizeMap.newBuilder().setRegionName(regionPrefetchMap.getFirst())
236          .setRegionCachedSize(regionPrefetchMap.getSecond()).build();
237      tmpMap.put(hfileName, tmpRegionFileSize);
238    });
239    return tmpMap;
240  }
241
242  static Map<String, Pair<String, Long>>
243    fromPB(Map<String, BucketCacheProtos.RegionFileSizeMap> prefetchHFileNames) {
244    Map<String, Pair<String, Long>> hfileMap = new HashMap<>();
245    prefetchHFileNames.forEach((hfileName, regionPrefetchMap) -> {
246      hfileMap.put(hfileName,
247        new Pair<>(regionPrefetchMap.getRegionName(), regionPrefetchMap.getRegionCachedSize()));
248    });
249    return hfileMap;
250  }
251}