001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.AbstractMap; 022import java.util.Collection; 023import java.util.List; 024import java.util.Map; 025import java.util.UUID; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HRegionInfo; 030import org.apache.hadoop.hbase.client.Scan; 031import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 032import org.apache.hadoop.hbase.snapshot.SnapshotManifest; 033import org.apache.hadoop.hbase.util.CommonFSUtils; 034import org.apache.hadoop.hbase.util.ConfigurationUtil; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.apache.yetus.audience.InterfaceStability; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 041import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 042 043/** 044 * Shared implementation of mapreduce code over multiple table snapshots. Utilized by both mapreduce 045 * {@link org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormat} and mapred 046 * {@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations. 047 */ 048@InterfaceAudience.LimitedPrivate({ "HBase" }) 049@InterfaceStability.Evolving 050public class MultiTableSnapshotInputFormatImpl { 051 private static final Logger LOG = 052 LoggerFactory.getLogger(MultiTableSnapshotInputFormatImpl.class); 053 054 public static final String RESTORE_DIRS_KEY = 055 "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping"; 056 public static final String SNAPSHOT_TO_SCANS_KEY = 057 "hbase.MultiTableSnapshotInputFormat.snapshotsToScans"; 058 059 /** 060 * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of 061 * restoreDir. 062 * <p/> 063 * Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY} 064 */ 065 public void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans, 066 Path restoreDir) throws IOException { 067 Path rootDir = CommonFSUtils.getRootDir(conf); 068 FileSystem fs = rootDir.getFileSystem(conf); 069 070 setSnapshotToScans(conf, snapshotScans); 071 Map<String, Path> restoreDirs = 072 generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir); 073 setSnapshotDirs(conf, restoreDirs); 074 restoreSnapshots(conf, restoreDirs, fs); 075 } 076 077 /** 078 * Return the list of splits extracted from the scans/snapshots pushed to conf by 079 * {@link #setInput(Configuration, Map, Path)} 080 * @param conf Configuration to determine splits from 081 * @return Return the list of splits extracted from the scans/snapshots pushed to conf 082 */ 083 public List<TableSnapshotInputFormatImpl.InputSplit> getSplits(Configuration conf) 084 throws IOException { 085 Path rootDir = CommonFSUtils.getRootDir(conf); 086 FileSystem fs = rootDir.getFileSystem(conf); 087 088 List<TableSnapshotInputFormatImpl.InputSplit> rtn = Lists.newArrayList(); 089 090 Map<String, Collection<Scan>> snapshotsToScans = getSnapshotsToScans(conf); 091 Map<String, Path> snapshotsToRestoreDirs = getSnapshotDirs(conf); 092 for (Map.Entry<String, Collection<Scan>> entry : snapshotsToScans.entrySet()) { 093 String snapshotName = entry.getKey(); 094 095 Path restoreDir = snapshotsToRestoreDirs.get(snapshotName); 096 097 SnapshotManifest manifest = 098 TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs); 099 List<HRegionInfo> regionInfos = 100 TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest); 101 102 for (Scan scan : entry.getValue()) { 103 List<TableSnapshotInputFormatImpl.InputSplit> splits = 104 TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf); 105 rtn.addAll(splits); 106 } 107 } 108 return rtn; 109 } 110 111 /** 112 * Retrieve the snapshot name -> list<scan> mapping pushed to configuration by 113 * {@link #setSnapshotToScans(Configuration, Map)} 114 * @param conf Configuration to extract name -> list<scan> mappings from. 115 * @return the snapshot name -> list<scan> mapping pushed to configuration 116 */ 117 public Map<String, Collection<Scan>> getSnapshotsToScans(Configuration conf) throws IOException { 118 119 Map<String, Collection<Scan>> rtn = Maps.newHashMap(); 120 121 for (Map.Entry<String, String> entry : ConfigurationUtil.getKeyValues(conf, 122 SNAPSHOT_TO_SCANS_KEY)) { 123 String snapshotName = entry.getKey(); 124 String scan = entry.getValue(); 125 126 Collection<Scan> snapshotScans = rtn.get(snapshotName); 127 if (snapshotScans == null) { 128 snapshotScans = Lists.newArrayList(); 129 rtn.put(snapshotName, snapshotScans); 130 } 131 132 snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan)); 133 } 134 135 return rtn; 136 } 137 138 /** 139 * Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY}) 140 */ 141 public void setSnapshotToScans(Configuration conf, Map<String, Collection<Scan>> snapshotScans) 142 throws IOException { 143 // flatten out snapshotScans for serialization to the job conf 144 List<Map.Entry<String, String>> snapshotToSerializedScans = Lists.newArrayList(); 145 146 for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) { 147 String snapshotName = entry.getKey(); 148 Collection<Scan> scans = entry.getValue(); 149 150 // serialize all scans and map them to the appropriate snapshot 151 for (Scan scan : scans) { 152 snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<>(snapshotName, 153 TableMapReduceUtil.convertScanToString(scan))); 154 } 155 } 156 157 ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans); 158 } 159 160 /** 161 * Retrieve the directories into which snapshots have been restored from 162 * ({@link #RESTORE_DIRS_KEY}) 163 * @param conf Configuration to extract restore directories from 164 * @return the directories into which snapshots have been restored from 165 */ 166 public Map<String, Path> getSnapshotDirs(Configuration conf) throws IOException { 167 List<Map.Entry<String, String>> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY); 168 Map<String, Path> rtn = Maps.newHashMapWithExpectedSize(kvps.size()); 169 170 for (Map.Entry<String, String> kvp : kvps) { 171 rtn.put(kvp.getKey(), new Path(kvp.getValue())); 172 } 173 174 return rtn; 175 } 176 177 public void setSnapshotDirs(Configuration conf, Map<String, Path> snapshotDirs) { 178 Map<String, String> toSet = Maps.newHashMap(); 179 180 for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) { 181 toSet.put(entry.getKey(), entry.getValue().toString()); 182 } 183 184 ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet()); 185 } 186 187 /** 188 * Generate a random path underneath baseRestoreDir for each snapshot in snapshots and return a 189 * map from the snapshot to the restore directory. 190 * @param snapshots collection of snapshot names to restore 191 * @param baseRestoreDir base directory under which all snapshots in snapshots will be restored 192 * @return a mapping from snapshot name to the directory in which that snapshot has been restored 193 */ 194 private Map<String, Path> generateSnapshotToRestoreDirMapping(Collection<String> snapshots, 195 Path baseRestoreDir) { 196 Map<String, Path> rtn = Maps.newHashMap(); 197 198 for (String snapshotName : snapshots) { 199 Path restoreSnapshotDir = 200 new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString()); 201 rtn.put(snapshotName, restoreSnapshotDir); 202 } 203 204 return rtn; 205 } 206 207 /** 208 * Restore each (snapshot name, restore directory) pair in snapshotToDir 209 * @param conf configuration to restore with 210 * @param snapshotToDir mapping from snapshot names to restore directories 211 * @param fs filesystem to do snapshot restoration on 212 */ 213 public void restoreSnapshots(Configuration conf, Map<String, Path> snapshotToDir, FileSystem fs) 214 throws IOException { 215 // TODO: restore from record readers to parallelize. 216 Path rootDir = CommonFSUtils.getRootDir(conf); 217 218 for (Map.Entry<String, Path> entry : snapshotToDir.entrySet()) { 219 String snapshotName = entry.getKey(); 220 Path restoreDir = entry.getValue(); 221 LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir 222 + " for MultiTableSnapshotInputFormat"); 223 restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs); 224 } 225 } 226 227 void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir, 228 FileSystem fs) throws IOException { 229 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); 230 } 231 232}