001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.fs;
019
020import java.io.IOException;
021import java.lang.reflect.InvocationTargetException;
022import java.util.Collection;
023import java.util.Objects;
024import java.util.stream.Collectors;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FSDataOutputStream;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.DoNotRetryIOException;
030import org.apache.hadoop.hbase.HBaseIOException;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.TableDescriptor;
034import org.apache.hadoop.hbase.util.CommonFSUtils;
035import org.apache.hadoop.hbase.util.ReflectionUtils;
036import org.apache.hadoop.hdfs.DistributedFileSystem;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041@InterfaceAudience.Private
042public final class ErasureCodingUtils {
043
044  private ErasureCodingUtils() {
045  }
046
047  private static final Logger LOG = LoggerFactory.getLogger(ErasureCodingUtils.class);
048
049  /**
050   * Runs checks against the FileSystem, verifying that HDFS is supported and the policy is
051   * available, enabled, and works with a simple write.
052   */
053  public static void verifySupport(Configuration conf, String policy) throws HBaseIOException {
054    DistributedFileSystem dfs = getDfs(conf);
055    checkAvailable(dfs, policy);
056
057    // Enable the policy on a test directory. Try writing ot it to ensure that HDFS allows it
058    // This acts as a safeguard against topology issues (not enough nodes for policy, etc) and
059    // anything else. This is otherwise hard to validate more directly.
060    Path globalTempDir = new Path(conf.get(HConstants.HBASE_DIR), HConstants.HBASE_TEMP_DIRECTORY);
061    Path currentTempDir = createTempDir(dfs, globalTempDir);
062    try {
063      setPolicy(dfs, currentTempDir, policy);
064      try (FSDataOutputStream out = dfs.create(new Path(currentTempDir, "test.out"))) {
065        out.writeUTF("Testing " + policy);
066      }
067    } catch (IOException e) {
068      throw new DoNotRetryIOException("Failed write test for EC policy. Check cause or logs", e);
069    } finally {
070      try {
071        dfs.delete(currentTempDir, true);
072      } catch (IOException e) {
073        LOG.warn("Failed to delete temp path for ec test", e);
074      }
075    }
076  }
077
078  private static Path createTempDir(FileSystem fs, Path tempDir) throws HBaseIOException {
079    Path currentTempDir = new Path(tempDir, "ec-test-" + System.currentTimeMillis());
080    try {
081      fs.mkdirs(currentTempDir);
082      fs.deleteOnExit(currentTempDir);
083    } catch (IOException e) {
084      throw new HBaseIOException("Failed to create test dir for EC write test", e);
085    }
086    return currentTempDir;
087  }
088
089  private static void checkAvailable(DistributedFileSystem dfs, String requestedPolicy)
090    throws HBaseIOException {
091    Collection<Object> policies;
092
093    try {
094      policies = callDfsMethod(dfs, "getAllErasureCodingPolicies");
095    } catch (IOException e) {
096      throw new HBaseIOException("Failed to check for Erasure Coding policy: " + requestedPolicy,
097        e);
098    }
099    for (Object policyInfo : policies) {
100      if (checkPolicyMatch(policyInfo, requestedPolicy)) {
101        return;
102      }
103    }
104    throw new DoNotRetryIOException("Cannot set Erasure Coding policy: " + requestedPolicy
105      + ". Policy not found. Available policies are: " + getPolicyNames(policies));
106  }
107
108  private static boolean checkPolicyMatch(Object policyInfo, String requestedPolicy)
109    throws DoNotRetryIOException {
110    try {
111      String policyName = getPolicyNameFromInfo(policyInfo);
112      if (requestedPolicy.equals(policyName)) {
113        boolean isEnabled = callObjectMethod(policyInfo, "isEnabled");
114        if (!isEnabled) {
115          throw new DoNotRetryIOException("Cannot set Erasure Coding policy: " + requestedPolicy
116            + ". The policy must be enabled, but has state "
117            + callObjectMethod(policyInfo, "getState"));
118        }
119        return true;
120      }
121    } catch (DoNotRetryIOException e) {
122      throw e;
123    } catch (IOException e) {
124      throw new DoNotRetryIOException(
125        "Unable to check for match of Erasure Coding Policy " + policyInfo, e);
126    }
127    return false;
128  }
129
130  private static String getPolicyNameFromInfo(Object policyInfo) throws IOException {
131    Object policy = callObjectMethod(policyInfo, "getPolicy");
132    return callObjectMethod(policy, "getName");
133  }
134
135  private static String getPolicyNames(Collection<Object> policyInfos) {
136    return policyInfos.stream().map(p -> {
137      try {
138        return getPolicyNameFromInfo(p);
139      } catch (IOException e) {
140        LOG.warn("Could not extract policy name from {}", p, e);
141        return "unknown";
142      }
143    }).collect(Collectors.joining(", "));
144  }
145
146  /**
147   * Check if EC policy is different between two descriptors
148   * @return true if a sync is necessary
149   */
150  public static boolean needsSync(TableDescriptor oldDescriptor, TableDescriptor newDescriptor) {
151    String newPolicy = oldDescriptor.getErasureCodingPolicy();
152    String oldPolicy = newDescriptor.getErasureCodingPolicy();
153    return !Objects.equals(oldPolicy, newPolicy);
154  }
155
156  /**
157   * Sync the EC policy state from the newDescriptor onto the FS for the table dir of the provided
158   * table descriptor. If the policy is null, we will remove erasure coding from the FS for the
159   * table dir. If it's non-null, we'll set it to that policy.
160   * @param newDescriptor descriptor containing the policy and table name
161   */
162  public static void sync(FileSystem fs, Path rootDir, TableDescriptor newDescriptor)
163    throws IOException {
164    String newPolicy = newDescriptor.getErasureCodingPolicy();
165    if (newPolicy == null) {
166      unsetPolicy(fs, rootDir, newDescriptor.getTableName());
167    } else {
168      setPolicy(fs, rootDir, newDescriptor.getTableName(), newPolicy);
169    }
170  }
171
172  /**
173   * Sets the EC policy on the table directory for the specified table
174   */
175  public static void setPolicy(FileSystem fs, Path rootDir, TableName tableName, String policy)
176    throws IOException {
177    Path path = CommonFSUtils.getTableDir(rootDir, tableName);
178    setPolicy(fs, path, policy);
179  }
180
181  /**
182   * Sets the EC policy on the path
183   */
184  public static void setPolicy(FileSystem fs, Path path, String policy) throws IOException {
185    callDfsMethod(getDfs(fs), "setErasureCodingPolicy", path, policy);
186  }
187
188  /**
189   * Unsets any EC policy specified on the path.
190   */
191  public static void unsetPolicy(FileSystem fs, Path rootDir, TableName tableName)
192    throws IOException {
193    DistributedFileSystem dfs = getDfs(fs);
194    Path path = CommonFSUtils.getTableDir(rootDir, tableName);
195    if (getPolicyNameForPath(dfs, path) == null) {
196      LOG.warn("No EC policy set for path {}, nothing to unset", path);
197      return;
198    }
199    callDfsMethod(dfs, "unsetErasureCodingPolicy", path);
200  }
201
202  public static void enablePolicy(FileSystem fs, String policy) throws IOException {
203    callDfsMethod(getDfs(fs), "enableErasureCodingPolicy", policy);
204  }
205
206  private static DistributedFileSystem getDfs(Configuration conf) throws HBaseIOException {
207    try {
208      return getDfs(FileSystem.get(conf));
209    } catch (DoNotRetryIOException e) {
210      throw e;
211    } catch (IOException e) {
212      throw new HBaseIOException("Failed to get FileSystem from conf", e);
213    }
214
215  }
216
217  private static DistributedFileSystem getDfs(FileSystem fs) throws DoNotRetryIOException {
218    if (!(fs instanceof DistributedFileSystem)) {
219      throw new DoNotRetryIOException(
220        "Cannot manage Erasure Coding policy. Erasure Coding is only available on HDFS, but fs is "
221          + fs.getClass().getSimpleName());
222    }
223    return (DistributedFileSystem) fs;
224  }
225
226  public static String getPolicyNameForPath(DistributedFileSystem dfs, Path path)
227    throws IOException {
228    Object policy = callDfsMethod(dfs, "getErasureCodingPolicy", path);
229    if (policy == null) {
230      return null;
231    }
232    return callObjectMethod(policy, "getName");
233  }
234
235  private interface ThrowingObjectSupplier {
236    Object run() throws IOException;
237  }
238
239  private static <T> T callDfsMethod(DistributedFileSystem dfs, String name, Object... params)
240    throws IOException {
241    return callObjectMethod(dfs, name, params);
242  }
243
244  private static <T> T callObjectMethod(Object object, String name, Object... params)
245    throws IOException {
246    return unwrapInvocationException(() -> ReflectionUtils.invokeMethod(object, name, params));
247  }
248
249  private static <T> T unwrapInvocationException(ThrowingObjectSupplier runnable)
250    throws IOException {
251    try {
252      return (T) runnable.run();
253    } catch (UnsupportedOperationException e) {
254      if (e.getCause() instanceof InvocationTargetException) {
255        Throwable cause = e.getCause().getCause();
256        if (cause instanceof IOException) {
257          throw (IOException) cause;
258        } else if (cause instanceof RuntimeException) {
259          throw (RuntimeException) cause;
260        }
261      }
262      throw e;
263    }
264  }
265}