001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.storefiletracker; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.List; 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.LinkedBlockingQueue; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.io.Reference; 031import org.apache.hadoop.hbase.regionserver.StoreContext; 032import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036public class StoreFileTrackerForTest extends DefaultStoreFileTracker { 037 038 private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerForTest.class); 039 private static ConcurrentMap<String, BlockingQueue<StoreFileInfo>> trackedFiles = 040 new ConcurrentHashMap<>(); 041 private String storeId; 042 043 public StoreFileTrackerForTest(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { 044 super(conf, isPrimaryReplica, ctx); 045 if (ctx != null && ctx.getRegionFileSystem() != null) { 046 this.storeId = ctx.getRegionInfo().getEncodedName() + "-" + ctx.getFamily().getNameAsString(); 047 LOG.info("created storeId: {}", storeId); 048 trackedFiles.computeIfAbsent(storeId, v -> new LinkedBlockingQueue<>()); 049 } else { 050 LOG.info("ctx.getRegionFileSystem() returned null. Leaving storeId null."); 051 } 052 } 053 054 @Override 055 protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException { 056 LOG.info("adding to storeId: {}", storeId); 057 trackedFiles.get(storeId).addAll(newFiles); 058 } 059 060 @Override 061 protected List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException { 062 return new ArrayList<>(trackedFiles.get(storeId)); 063 } 064 065 public static boolean tracked(String encodedRegionName, String family, Path file) { 066 BlockingQueue<StoreFileInfo> files = trackedFiles.get(encodedRegionName + "-" + family); 067 return files != null && files.stream().anyMatch(s -> s.getPath().equals(file)); 068 } 069 070 public static void clear() { 071 trackedFiles.clear(); 072 } 073 074 @Override 075 public Reference readReference(Path p) throws IOException { 076 return super.readReference(p); 077 } 078 079}