001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.mapreduce; 019 020import java.io.IOException; 021import java.lang.reflect.Field; 022import java.lang.reflect.InvocationTargetException; 023import java.lang.reflect.Method; 024import java.math.BigDecimal; 025import java.util.Arrays; 026import java.util.List; 027import java.util.Objects; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileStatus; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.backup.BackupCopyJob; 034import org.apache.hadoop.hbase.backup.BackupInfo; 035import org.apache.hadoop.hbase.backup.BackupType; 036import org.apache.hadoop.hbase.backup.impl.BackupManager; 037import org.apache.hadoop.hbase.backup.util.BackupUtils; 038import org.apache.hadoop.hbase.snapshot.ExportSnapshot; 039import org.apache.hadoop.io.SequenceFile; 040import org.apache.hadoop.io.Text; 041import org.apache.hadoop.mapreduce.Cluster; 042import org.apache.hadoop.mapreduce.Counters; 043import org.apache.hadoop.mapreduce.Job; 044import org.apache.hadoop.mapreduce.JobID; 045import org.apache.hadoop.tools.CopyListingFileStatus; 046import org.apache.hadoop.tools.DistCp; 047import org.apache.hadoop.tools.DistCpConstants; 048import org.apache.hadoop.tools.DistCpOptions; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * Map-Reduce implementation of {@link BackupCopyJob}. Basically, there are 2 types of copy 055 * operation: one is copying from snapshot, which bases on extending ExportSnapshot's function, the 056 * other is copying for incremental log files, which bases on extending DistCp's function. 057 */ 058@InterfaceAudience.Private 059public class MapReduceBackupCopyJob implements BackupCopyJob { 060 public static final String NUMBER_OF_LEVELS_TO_PRESERVE_KEY = "num.levels.preserve"; 061 private static final Logger LOG = LoggerFactory.getLogger(MapReduceBackupCopyJob.class); 062 063 private Configuration conf; 064 065 // Accumulated progress within the whole backup process for the copy operation 066 private float progressDone = 0.1f; 067 private long bytesCopied = 0; 068 private static float INIT_PROGRESS = 0.1f; 069 070 // The percentage of the current copy task within the whole task if multiple time copies are 071 // needed. The default value is 100%, which means only 1 copy task for the whole. 072 private float subTaskPercntgInWholeTask = 1f; 073 074 public MapReduceBackupCopyJob() { 075 } 076 077 @Override 078 public Configuration getConf() { 079 return conf; 080 } 081 082 @Override 083 public void setConf(Configuration conf) { 084 this.conf = conf; 085 } 086 087 /** 088 * Get the current copy task percentage within the whole task if multiple copies are needed. 089 * @return the current copy task percentage 090 */ 091 public float getSubTaskPercntgInWholeTask() { 092 return subTaskPercntgInWholeTask; 093 } 094 095 /** 096 * Set the current copy task percentage within the whole task if multiple copies are needed. Must 097 * be called before calling 098 * {@link #copy(BackupInfo, BackupManager, Configuration, BackupType, String[])} 099 * @param subTaskPercntgInWholeTask The percentage of the copy subtask 100 */ 101 public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) { 102 this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask; 103 } 104 105 static class SnapshotCopy extends ExportSnapshot { 106 private BackupInfo backupInfo; 107 private TableName table; 108 109 public SnapshotCopy(BackupInfo backupInfo, TableName table) { 110 super(); 111 this.backupInfo = backupInfo; 112 this.table = table; 113 } 114 115 public TableName getTable() { 116 return this.table; 117 } 118 119 public BackupInfo getBackupInfo() { 120 return this.backupInfo; 121 } 122 } 123 124 /** 125 * Update the ongoing backup with new progress. 126 * @param backupInfo backup info 127 * @param newProgress progress 128 * @param bytesCopied bytes copied 129 * @throws IOException exception 130 */ 131 static void updateProgress(BackupInfo backupInfo, BackupManager backupManager, int newProgress, 132 long bytesCopied) throws IOException { 133 // compose the new backup progress data, using fake number for now 134 String backupProgressData = newProgress + "%"; 135 136 backupInfo.setProgress(newProgress); 137 backupManager.updateBackupInfo(backupInfo); 138 LOG.debug("Backup progress data \"" + backupProgressData 139 + "\" has been updated to backup system table for " + backupInfo.getBackupId()); 140 } 141 142 /** 143 * Extends DistCp for progress updating to backup system table during backup. Using DistCpV2 144 * (MAPREDUCE-2765). Simply extend it and override execute() method to get the Job reference for 145 * progress updating. Only the argument "src1, [src2, [...]] dst" is supported, no more DistCp 146 * options. 147 */ 148 149 class BackupDistCp extends DistCp { 150 151 private BackupInfo backupInfo; 152 private BackupManager backupManager; 153 154 public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo backupInfo, 155 BackupManager backupManager) throws Exception { 156 super(conf, options); 157 this.backupInfo = backupInfo; 158 this.backupManager = backupManager; 159 } 160 161 @Override 162 public Job execute() throws Exception { 163 164 // reflection preparation for private methods and fields 165 Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class; 166 Method methodCleanup = classDistCp.getDeclaredMethod("cleanup"); 167 168 Field fieldInputOptions = getInputOptionsField(classDistCp); 169 Field fieldSubmitted = classDistCp.getDeclaredField("submitted"); 170 171 methodCleanup.setAccessible(true); 172 fieldInputOptions.setAccessible(true); 173 fieldSubmitted.setAccessible(true); 174 175 // execute() logic starts here 176 assert fieldInputOptions.get(this) != null; 177 178 Job job = null; 179 try { 180 181 List<Path> srcs = getSourcePaths(fieldInputOptions); 182 183 long totalSrcLgth = 0; 184 for (Path aSrc : srcs) { 185 totalSrcLgth += BackupUtils.getFilesLength(aSrc.getFileSystem(super.getConf()), aSrc); 186 } 187 188 // Async call 189 job = super.execute(); 190 // Update the copy progress to system table every 0.5s if progress value changed 191 int progressReportFreq = MapReduceBackupCopyJob.this.getConf() 192 .getInt("hbase.backup.progressreport.frequency", 500); 193 float lastProgress = progressDone; 194 while (!job.isComplete()) { 195 float newProgress = 196 progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS); 197 198 if (newProgress > lastProgress) { 199 200 BigDecimal progressData = 201 new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP); 202 String newProgressStr = progressData + "%"; 203 LOG.info("Progress: " + newProgressStr); 204 updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied); 205 LOG.debug("Backup progress data updated to backup system table: \"Progress: " 206 + newProgressStr + ".\""); 207 lastProgress = newProgress; 208 } 209 Thread.sleep(progressReportFreq); 210 } 211 // update the progress data after copy job complete 212 float newProgress = 213 progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS); 214 BigDecimal progressData = 215 new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP); 216 217 String newProgressStr = progressData + "%"; 218 LOG.info("Progress: " + newProgressStr + " subTask: " + subTaskPercntgInWholeTask 219 + " mapProgress: " + job.mapProgress()); 220 221 // accumulate the overall backup progress 222 progressDone = newProgress; 223 bytesCopied += totalSrcLgth; 224 225 updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied); 226 LOG.debug("Backup progress data updated to backup system table: \"Progress: " 227 + newProgressStr + " - " + bytesCopied + " bytes copied.\""); 228 } catch (Throwable t) { 229 LOG.error(t.toString(), t); 230 throw t; 231 } 232 233 String jobID = job.getJobID().toString(); 234 job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID); 235 236 LOG.debug( 237 "DistCp job-id: " + jobID + " completed: " + job.isComplete() + " " + job.isSuccessful()); 238 Counters ctrs = job.getCounters(); 239 LOG.debug(Objects.toString(ctrs)); 240 if (job.isComplete() && !job.isSuccessful()) { 241 throw new Exception("DistCp job-id: " + jobID + " failed"); 242 } 243 244 return job; 245 } 246 247 private Field getInputOptionsField(Class<?> classDistCp) throws IOException { 248 Field f = null; 249 try { 250 f = classDistCp.getDeclaredField("inputOptions"); 251 } catch (Exception e) { 252 // Haddop 3 253 try { 254 f = classDistCp.getDeclaredField("context"); 255 } catch (NoSuchFieldException | SecurityException e1) { 256 throw new IOException(e1); 257 } 258 } 259 return f; 260 } 261 262 @SuppressWarnings("unchecked") 263 private List<Path> getSourcePaths(Field fieldInputOptions) throws IOException { 264 Object options; 265 try { 266 options = fieldInputOptions.get(this); 267 if (options instanceof DistCpOptions) { 268 return ((DistCpOptions) options).getSourcePaths(); 269 } else { 270 // Hadoop 3 271 Class<?> classContext = Class.forName("org.apache.hadoop.tools.DistCpContext"); 272 Method methodGetSourcePaths = classContext.getDeclaredMethod("getSourcePaths"); 273 methodGetSourcePaths.setAccessible(true); 274 275 return (List<Path>) methodGetSourcePaths.invoke(options); 276 } 277 } catch (IllegalArgumentException | IllegalAccessException | ClassNotFoundException 278 | NoSuchMethodException | SecurityException | InvocationTargetException e) { 279 throw new IOException(e); 280 } 281 282 } 283 284 @Override 285 protected Path createInputFileListing(Job job) throws IOException { 286 287 if (conf.get(NUMBER_OF_LEVELS_TO_PRESERVE_KEY) == null) { 288 return super.createInputFileListing(job); 289 } 290 long totalBytesExpected = 0; 291 int totalRecords = 0; 292 Path fileListingPath = getFileListingPath(); 293 try (SequenceFile.Writer writer = getWriter(fileListingPath)) { 294 List<Path> srcFiles = getSourceFiles(); 295 if (srcFiles.size() == 0) { 296 return fileListingPath; 297 } 298 totalRecords = srcFiles.size(); 299 FileSystem fs = srcFiles.get(0).getFileSystem(conf); 300 for (Path path : srcFiles) { 301 FileStatus fst = fs.getFileStatus(path); 302 totalBytesExpected += fst.getLen(); 303 Text key = getKey(path); 304 writer.append(key, new CopyListingFileStatus(fst)); 305 } 306 writer.close(); 307 308 // update jobs configuration 309 310 Configuration cfg = job.getConfiguration(); 311 cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, totalBytesExpected); 312 cfg.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, fileListingPath.toString()); 313 cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, totalRecords); 314 } catch (NoSuchFieldException | SecurityException | IllegalArgumentException 315 | IllegalAccessException | NoSuchMethodException | ClassNotFoundException 316 | InvocationTargetException e) { 317 throw new IOException(e); 318 } 319 return fileListingPath; 320 } 321 322 private Text getKey(Path path) { 323 int level = conf.getInt(NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 1); 324 int count = 0; 325 String relPath = ""; 326 while (count++ < level) { 327 relPath = Path.SEPARATOR + path.getName() + relPath; 328 path = path.getParent(); 329 } 330 return new Text(relPath); 331 } 332 333 private List<Path> getSourceFiles() throws NoSuchFieldException, SecurityException, 334 IllegalArgumentException, IllegalAccessException, NoSuchMethodException, 335 ClassNotFoundException, InvocationTargetException, IOException { 336 Field options = null; 337 try { 338 options = DistCp.class.getDeclaredField("inputOptions"); 339 } catch (NoSuchFieldException | SecurityException e) { 340 options = DistCp.class.getDeclaredField("context"); 341 } 342 options.setAccessible(true); 343 return getSourcePaths(options); 344 } 345 346 private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException { 347 FileSystem fs = pathToListFile.getFileSystem(conf); 348 fs.delete(pathToListFile, false); 349 return SequenceFile.createWriter(conf, SequenceFile.Writer.file(pathToListFile), 350 SequenceFile.Writer.keyClass(Text.class), 351 SequenceFile.Writer.valueClass(CopyListingFileStatus.class), 352 SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE)); 353 } 354 355 } 356 357 /** 358 * Do backup copy based on different types. 359 * @param context The backup info 360 * @param conf The hadoop configuration 361 * @param copyType The backup copy type 362 * @param options Options for customized ExportSnapshot or DistCp 363 * @throws IOException exception 364 */ 365 @Override 366 public int copy(BackupInfo context, BackupManager backupManager, Configuration conf, 367 BackupType copyType, String[] options) throws IOException { 368 int res = 0; 369 370 try { 371 if (copyType == BackupType.FULL) { 372 SnapshotCopy snapshotCp = new SnapshotCopy(context, context.getTableBySnapshot(options[1])); 373 LOG.debug("Doing SNAPSHOT_COPY"); 374 // Make a new instance of conf to be used by the snapshot copy class. 375 snapshotCp.setConf(new Configuration(conf)); 376 res = snapshotCp.run(options); 377 378 } else if (copyType == BackupType.INCREMENTAL) { 379 LOG.debug("Doing COPY_TYPE_DISTCP"); 380 setSubTaskPercntgInWholeTask(1f); 381 382 BackupDistCp distcp = 383 new BackupDistCp(new Configuration(conf), null, context, backupManager); 384 // Handle a special case where the source file is a single file. 385 // In this case, distcp will not create the target dir. It just take the 386 // target as a file name and copy source file to the target (as a file name). 387 // We need to create the target dir before run distcp. 388 LOG.debug("DistCp options: " + Arrays.toString(options)); 389 Path dest = new Path(options[options.length - 1]); 390 String[] newOptions = new String[options.length + 1]; 391 System.arraycopy(options, 0, newOptions, 1, options.length); 392 newOptions[0] = "-async"; // run DisCp in async mode 393 FileSystem destfs = dest.getFileSystem(conf); 394 if (!destfs.exists(dest)) { 395 destfs.mkdirs(dest); 396 } 397 res = distcp.run(newOptions); 398 } 399 return res; 400 401 } catch (Exception e) { 402 throw new IOException(e); 403 } 404 } 405 406 @Override 407 public void cancel(String jobId) throws IOException { 408 JobID id = JobID.forName(jobId); 409 Cluster cluster = new Cluster(this.getConf()); 410 try { 411 Job job = cluster.getJob(id); 412 if (job == null) { 413 LOG.error("No job found for " + id); 414 // should we throw exception 415 return; 416 } 417 if (job.isComplete() || job.isRetired()) { 418 return; 419 } 420 421 job.killJob(); 422 LOG.debug("Killed copy job " + id); 423 } catch (InterruptedException e) { 424 throw new IOException(e); 425 } 426 } 427 428}