001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.mapreduce;
019
020import java.io.IOException;
021import java.lang.reflect.Field;
022import java.lang.reflect.InvocationTargetException;
023import java.lang.reflect.Method;
024import java.math.BigDecimal;
025import java.util.Arrays;
026import java.util.List;
027import java.util.Objects;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.backup.BackupCopyJob;
034import org.apache.hadoop.hbase.backup.BackupInfo;
035import org.apache.hadoop.hbase.backup.BackupType;
036import org.apache.hadoop.hbase.backup.impl.BackupManager;
037import org.apache.hadoop.hbase.backup.util.BackupUtils;
038import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
039import org.apache.hadoop.io.SequenceFile;
040import org.apache.hadoop.io.Text;
041import org.apache.hadoop.mapreduce.Cluster;
042import org.apache.hadoop.mapreduce.Counters;
043import org.apache.hadoop.mapreduce.Job;
044import org.apache.hadoop.mapreduce.JobID;
045import org.apache.hadoop.tools.CopyListingFileStatus;
046import org.apache.hadoop.tools.DistCp;
047import org.apache.hadoop.tools.DistCpConstants;
048import org.apache.hadoop.tools.DistCpOptions;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Map-Reduce implementation of {@link BackupCopyJob}. Basically, there are 2 types of copy
055 * operation: one is copying from snapshot, which bases on extending ExportSnapshot's function, the
056 * other is copying for incremental log files, which bases on extending DistCp's function.
057 */
058@InterfaceAudience.Private
059public class MapReduceBackupCopyJob implements BackupCopyJob {
060  public static final String NUMBER_OF_LEVELS_TO_PRESERVE_KEY = "num.levels.preserve";
061  private static final Logger LOG = LoggerFactory.getLogger(MapReduceBackupCopyJob.class);
062
063  private Configuration conf;
064
065  // Accumulated progress within the whole backup process for the copy operation
066  private float progressDone = 0.1f;
067  private long bytesCopied = 0;
068  private static float INIT_PROGRESS = 0.1f;
069
070  // The percentage of the current copy task within the whole task if multiple time copies are
071  // needed. The default value is 100%, which means only 1 copy task for the whole.
072  private float subTaskPercntgInWholeTask = 1f;
073
074  public MapReduceBackupCopyJob() {
075  }
076
077  @Override
078  public Configuration getConf() {
079    return conf;
080  }
081
082  @Override
083  public void setConf(Configuration conf) {
084    this.conf = conf;
085  }
086
087  /**
088   * Get the current copy task percentage within the whole task if multiple copies are needed.
089   * @return the current copy task percentage
090   */
091  public float getSubTaskPercntgInWholeTask() {
092    return subTaskPercntgInWholeTask;
093  }
094
095  /**
096   * Set the current copy task percentage within the whole task if multiple copies are needed. Must
097   * be called before calling
098   * {@link #copy(BackupInfo, BackupManager, Configuration, BackupType, String[])}
099   * @param subTaskPercntgInWholeTask The percentage of the copy subtask
100   */
101  public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) {
102    this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask;
103  }
104
105  static class SnapshotCopy extends ExportSnapshot {
106    private BackupInfo backupInfo;
107    private TableName table;
108
109    public SnapshotCopy(BackupInfo backupInfo, TableName table) {
110      super();
111      this.backupInfo = backupInfo;
112      this.table = table;
113    }
114
115    public TableName getTable() {
116      return this.table;
117    }
118
119    public BackupInfo getBackupInfo() {
120      return this.backupInfo;
121    }
122  }
123
124  /**
125   * Update the ongoing backup with new progress.
126   * @param backupInfo  backup info
127   * @param newProgress progress
128   * @param bytesCopied bytes copied
129   * @throws IOException exception
130   */
131  static void updateProgress(BackupInfo backupInfo, BackupManager backupManager, int newProgress,
132    long bytesCopied) throws IOException {
133    // compose the new backup progress data, using fake number for now
134    String backupProgressData = newProgress + "%";
135
136    backupInfo.setProgress(newProgress);
137    backupManager.updateBackupInfo(backupInfo);
138    LOG.debug("Backup progress data \"" + backupProgressData
139      + "\" has been updated to backup system table for " + backupInfo.getBackupId());
140  }
141
142  /**
143   * Extends DistCp for progress updating to backup system table during backup. Using DistCpV2
144   * (MAPREDUCE-2765). Simply extend it and override execute() method to get the Job reference for
145   * progress updating. Only the argument "src1, [src2, [...]] dst" is supported, no more DistCp
146   * options.
147   */
148
149  class BackupDistCp extends DistCp {
150
151    private BackupInfo backupInfo;
152    private BackupManager backupManager;
153
154    public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo backupInfo,
155      BackupManager backupManager) throws Exception {
156      super(conf, options);
157      this.backupInfo = backupInfo;
158      this.backupManager = backupManager;
159    }
160
161    @Override
162    public Job execute() throws Exception {
163
164      // reflection preparation for private methods and fields
165      Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
166      Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
167
168      Field fieldInputOptions = getInputOptionsField(classDistCp);
169      Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
170
171      methodCleanup.setAccessible(true);
172      fieldInputOptions.setAccessible(true);
173      fieldSubmitted.setAccessible(true);
174
175      // execute() logic starts here
176      assert fieldInputOptions.get(this) != null;
177
178      Job job = null;
179      try {
180
181        List<Path> srcs = getSourcePaths(fieldInputOptions);
182
183        long totalSrcLgth = 0;
184        for (Path aSrc : srcs) {
185          totalSrcLgth += BackupUtils.getFilesLength(aSrc.getFileSystem(super.getConf()), aSrc);
186        }
187
188        // Async call
189        job = super.execute();
190        // Update the copy progress to system table every 0.5s if progress value changed
191        int progressReportFreq = MapReduceBackupCopyJob.this.getConf()
192          .getInt("hbase.backup.progressreport.frequency", 500);
193        float lastProgress = progressDone;
194        while (!job.isComplete()) {
195          float newProgress =
196            progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
197
198          if (newProgress > lastProgress) {
199
200            BigDecimal progressData =
201              new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
202            String newProgressStr = progressData + "%";
203            LOG.info("Progress: " + newProgressStr);
204            updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied);
205            LOG.debug("Backup progress data updated to backup system table: \"Progress: "
206              + newProgressStr + ".\"");
207            lastProgress = newProgress;
208          }
209          Thread.sleep(progressReportFreq);
210        }
211        // update the progress data after copy job complete
212        float newProgress =
213          progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
214        BigDecimal progressData =
215          new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
216
217        String newProgressStr = progressData + "%";
218        LOG.info("Progress: " + newProgressStr + " subTask: " + subTaskPercntgInWholeTask
219          + " mapProgress: " + job.mapProgress());
220
221        // accumulate the overall backup progress
222        progressDone = newProgress;
223        bytesCopied += totalSrcLgth;
224
225        updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied);
226        LOG.debug("Backup progress data updated to backup system table: \"Progress: "
227          + newProgressStr + " - " + bytesCopied + " bytes copied.\"");
228      } catch (Throwable t) {
229        LOG.error(t.toString(), t);
230        throw t;
231      }
232
233      String jobID = job.getJobID().toString();
234      job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
235
236      LOG.debug(
237        "DistCp job-id: " + jobID + " completed: " + job.isComplete() + " " + job.isSuccessful());
238      Counters ctrs = job.getCounters();
239      LOG.debug(Objects.toString(ctrs));
240      if (job.isComplete() && !job.isSuccessful()) {
241        throw new Exception("DistCp job-id: " + jobID + " failed");
242      }
243
244      return job;
245    }
246
247    private Field getInputOptionsField(Class<?> classDistCp) throws IOException {
248      Field f = null;
249      try {
250        f = classDistCp.getDeclaredField("inputOptions");
251      } catch (Exception e) {
252        // Haddop 3
253        try {
254          f = classDistCp.getDeclaredField("context");
255        } catch (NoSuchFieldException | SecurityException e1) {
256          throw new IOException(e1);
257        }
258      }
259      return f;
260    }
261
262    @SuppressWarnings("unchecked")
263    private List<Path> getSourcePaths(Field fieldInputOptions) throws IOException {
264      Object options;
265      try {
266        options = fieldInputOptions.get(this);
267        if (options instanceof DistCpOptions) {
268          return ((DistCpOptions) options).getSourcePaths();
269        } else {
270          // Hadoop 3
271          Class<?> classContext = Class.forName("org.apache.hadoop.tools.DistCpContext");
272          Method methodGetSourcePaths = classContext.getDeclaredMethod("getSourcePaths");
273          methodGetSourcePaths.setAccessible(true);
274
275          return (List<Path>) methodGetSourcePaths.invoke(options);
276        }
277      } catch (IllegalArgumentException | IllegalAccessException | ClassNotFoundException
278        | NoSuchMethodException | SecurityException | InvocationTargetException e) {
279        throw new IOException(e);
280      }
281
282    }
283
284    @Override
285    protected Path createInputFileListing(Job job) throws IOException {
286
287      if (conf.get(NUMBER_OF_LEVELS_TO_PRESERVE_KEY) == null) {
288        return super.createInputFileListing(job);
289      }
290      long totalBytesExpected = 0;
291      int totalRecords = 0;
292      Path fileListingPath = getFileListingPath();
293      try (SequenceFile.Writer writer = getWriter(fileListingPath)) {
294        List<Path> srcFiles = getSourceFiles();
295        if (srcFiles.size() == 0) {
296          return fileListingPath;
297        }
298        totalRecords = srcFiles.size();
299        FileSystem fs = srcFiles.get(0).getFileSystem(conf);
300        for (Path path : srcFiles) {
301          FileStatus fst = fs.getFileStatus(path);
302          totalBytesExpected += fst.getLen();
303          Text key = getKey(path);
304          writer.append(key, new CopyListingFileStatus(fst));
305        }
306        writer.close();
307
308        // update jobs configuration
309
310        Configuration cfg = job.getConfiguration();
311        cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, totalBytesExpected);
312        cfg.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, fileListingPath.toString());
313        cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, totalRecords);
314      } catch (NoSuchFieldException | SecurityException | IllegalArgumentException
315        | IllegalAccessException | NoSuchMethodException | ClassNotFoundException
316        | InvocationTargetException e) {
317        throw new IOException(e);
318      }
319      return fileListingPath;
320    }
321
322    private Text getKey(Path path) {
323      int level = conf.getInt(NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 1);
324      int count = 0;
325      String relPath = "";
326      while (count++ < level) {
327        relPath = Path.SEPARATOR + path.getName() + relPath;
328        path = path.getParent();
329      }
330      return new Text(relPath);
331    }
332
333    private List<Path> getSourceFiles() throws NoSuchFieldException, SecurityException,
334      IllegalArgumentException, IllegalAccessException, NoSuchMethodException,
335      ClassNotFoundException, InvocationTargetException, IOException {
336      Field options = null;
337      try {
338        options = DistCp.class.getDeclaredField("inputOptions");
339      } catch (NoSuchFieldException | SecurityException e) {
340        options = DistCp.class.getDeclaredField("context");
341      }
342      options.setAccessible(true);
343      return getSourcePaths(options);
344    }
345
346    private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
347      FileSystem fs = pathToListFile.getFileSystem(conf);
348      fs.delete(pathToListFile, false);
349      return SequenceFile.createWriter(conf, SequenceFile.Writer.file(pathToListFile),
350        SequenceFile.Writer.keyClass(Text.class),
351        SequenceFile.Writer.valueClass(CopyListingFileStatus.class),
352        SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
353    }
354
355  }
356
357  /**
358   * Do backup copy based on different types.
359   * @param context  The backup info
360   * @param conf     The hadoop configuration
361   * @param copyType The backup copy type
362   * @param options  Options for customized ExportSnapshot or DistCp
363   * @throws IOException exception
364   */
365  @Override
366  public int copy(BackupInfo context, BackupManager backupManager, Configuration conf,
367    BackupType copyType, String[] options) throws IOException {
368    int res = 0;
369
370    try {
371      if (copyType == BackupType.FULL) {
372        SnapshotCopy snapshotCp = new SnapshotCopy(context, context.getTableBySnapshot(options[1]));
373        LOG.debug("Doing SNAPSHOT_COPY");
374        // Make a new instance of conf to be used by the snapshot copy class.
375        snapshotCp.setConf(new Configuration(conf));
376        res = snapshotCp.run(options);
377
378      } else if (copyType == BackupType.INCREMENTAL) {
379        LOG.debug("Doing COPY_TYPE_DISTCP");
380        setSubTaskPercntgInWholeTask(1f);
381
382        BackupDistCp distcp =
383          new BackupDistCp(new Configuration(conf), null, context, backupManager);
384        // Handle a special case where the source file is a single file.
385        // In this case, distcp will not create the target dir. It just take the
386        // target as a file name and copy source file to the target (as a file name).
387        // We need to create the target dir before run distcp.
388        LOG.debug("DistCp options: " + Arrays.toString(options));
389        Path dest = new Path(options[options.length - 1]);
390        String[] newOptions = new String[options.length + 1];
391        System.arraycopy(options, 0, newOptions, 1, options.length);
392        newOptions[0] = "-async"; // run DisCp in async mode
393        FileSystem destfs = dest.getFileSystem(conf);
394        if (!destfs.exists(dest)) {
395          destfs.mkdirs(dest);
396        }
397        res = distcp.run(newOptions);
398      }
399      return res;
400
401    } catch (Exception e) {
402      throw new IOException(e);
403    }
404  }
405
406  @Override
407  public void cancel(String jobId) throws IOException {
408    JobID id = JobID.forName(jobId);
409    Cluster cluster = new Cluster(this.getConf());
410    try {
411      Job job = cluster.getJob(id);
412      if (job == null) {
413        LOG.error("No job found for " + id);
414        // should we throw exception
415        return;
416      }
417      if (job.isComplete() || job.isRetired()) {
418        return;
419      }
420
421      job.killJob();
422      LOG.debug("Killed copy job " + id);
423    } catch (InterruptedException e) {
424      throw new IOException(e);
425    }
426  }
427
428}