001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.fs; 019 020import java.io.IOException; 021import java.lang.reflect.InvocationTargetException; 022import java.util.Collection; 023import java.util.Objects; 024import java.util.stream.Collectors; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FSDataOutputStream; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.DoNotRetryIOException; 030import org.apache.hadoop.hbase.HBaseIOException; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.TableDescriptor; 034import org.apache.hadoop.hbase.util.CommonFSUtils; 035import org.apache.hadoop.hbase.util.ReflectionUtils; 036import org.apache.hadoop.hdfs.DistributedFileSystem; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041@InterfaceAudience.Private 042public final class ErasureCodingUtils { 043 044 private ErasureCodingUtils() { 045 } 046 047 private static final Logger LOG = LoggerFactory.getLogger(ErasureCodingUtils.class); 048 049 /** 050 * Runs checks against the FileSystem, verifying that HDFS is supported and the policy is 051 * available, enabled, and works with a simple write. 052 */ 053 public static void verifySupport(Configuration conf, String policy) throws HBaseIOException { 054 DistributedFileSystem dfs = getDfs(conf); 055 checkAvailable(dfs, policy); 056 057 // Enable the policy on a test directory. Try writing ot it to ensure that HDFS allows it 058 // This acts as a safeguard against topology issues (not enough nodes for policy, etc) and 059 // anything else. This is otherwise hard to validate more directly. 060 Path globalTempDir = new Path(conf.get(HConstants.HBASE_DIR), HConstants.HBASE_TEMP_DIRECTORY); 061 Path currentTempDir = createTempDir(dfs, globalTempDir); 062 try { 063 setPolicy(dfs, currentTempDir, policy); 064 try (FSDataOutputStream out = dfs.create(new Path(currentTempDir, "test.out"))) { 065 out.writeUTF("Testing " + policy); 066 } 067 } catch (IOException e) { 068 throw new DoNotRetryIOException("Failed write test for EC policy. Check cause or logs", e); 069 } finally { 070 try { 071 dfs.delete(currentTempDir, true); 072 } catch (IOException e) { 073 LOG.warn("Failed to delete temp path for ec test", e); 074 } 075 } 076 } 077 078 private static Path createTempDir(FileSystem fs, Path tempDir) throws HBaseIOException { 079 Path currentTempDir = new Path(tempDir, "ec-test-" + System.currentTimeMillis()); 080 try { 081 fs.mkdirs(currentTempDir); 082 fs.deleteOnExit(currentTempDir); 083 } catch (IOException e) { 084 throw new HBaseIOException("Failed to create test dir for EC write test", e); 085 } 086 return currentTempDir; 087 } 088 089 private static void checkAvailable(DistributedFileSystem dfs, String requestedPolicy) 090 throws HBaseIOException { 091 Collection<Object> policies; 092 093 try { 094 policies = callDfsMethod(dfs, "getAllErasureCodingPolicies"); 095 } catch (IOException e) { 096 throw new HBaseIOException("Failed to check for Erasure Coding policy: " + requestedPolicy, 097 e); 098 } 099 for (Object policyInfo : policies) { 100 if (checkPolicyMatch(policyInfo, requestedPolicy)) { 101 return; 102 } 103 } 104 throw new DoNotRetryIOException("Cannot set Erasure Coding policy: " + requestedPolicy 105 + ". Policy not found. Available policies are: " + getPolicyNames(policies)); 106 } 107 108 private static boolean checkPolicyMatch(Object policyInfo, String requestedPolicy) 109 throws DoNotRetryIOException { 110 try { 111 String policyName = getPolicyNameFromInfo(policyInfo); 112 if (requestedPolicy.equals(policyName)) { 113 boolean isEnabled = callObjectMethod(policyInfo, "isEnabled"); 114 if (!isEnabled) { 115 throw new DoNotRetryIOException("Cannot set Erasure Coding policy: " + requestedPolicy 116 + ". The policy must be enabled, but has state " 117 + callObjectMethod(policyInfo, "getState")); 118 } 119 return true; 120 } 121 } catch (DoNotRetryIOException e) { 122 throw e; 123 } catch (IOException e) { 124 throw new DoNotRetryIOException( 125 "Unable to check for match of Erasure Coding Policy " + policyInfo, e); 126 } 127 return false; 128 } 129 130 private static String getPolicyNameFromInfo(Object policyInfo) throws IOException { 131 Object policy = callObjectMethod(policyInfo, "getPolicy"); 132 return callObjectMethod(policy, "getName"); 133 } 134 135 private static String getPolicyNames(Collection<Object> policyInfos) { 136 return policyInfos.stream().map(p -> { 137 try { 138 return getPolicyNameFromInfo(p); 139 } catch (IOException e) { 140 LOG.warn("Could not extract policy name from {}", p, e); 141 return "unknown"; 142 } 143 }).collect(Collectors.joining(", ")); 144 } 145 146 /** 147 * Check if EC policy is different between two descriptors 148 * @return true if a sync is necessary 149 */ 150 public static boolean needsSync(TableDescriptor oldDescriptor, TableDescriptor newDescriptor) { 151 String newPolicy = oldDescriptor.getErasureCodingPolicy(); 152 String oldPolicy = newDescriptor.getErasureCodingPolicy(); 153 return !Objects.equals(oldPolicy, newPolicy); 154 } 155 156 /** 157 * Sync the EC policy state from the newDescriptor onto the FS for the table dir of the provided 158 * table descriptor. If the policy is null, we will remove erasure coding from the FS for the 159 * table dir. If it's non-null, we'll set it to that policy. 160 * @param newDescriptor descriptor containing the policy and table name 161 */ 162 public static void sync(FileSystem fs, Path rootDir, TableDescriptor newDescriptor) 163 throws IOException { 164 String newPolicy = newDescriptor.getErasureCodingPolicy(); 165 if (newPolicy == null) { 166 unsetPolicy(fs, rootDir, newDescriptor.getTableName()); 167 } else { 168 setPolicy(fs, rootDir, newDescriptor.getTableName(), newPolicy); 169 } 170 } 171 172 /** 173 * Sets the EC policy on the table directory for the specified table 174 */ 175 public static void setPolicy(FileSystem fs, Path rootDir, TableName tableName, String policy) 176 throws IOException { 177 Path path = CommonFSUtils.getTableDir(rootDir, tableName); 178 setPolicy(fs, path, policy); 179 } 180 181 /** 182 * Sets the EC policy on the path 183 */ 184 public static void setPolicy(FileSystem fs, Path path, String policy) throws IOException { 185 callDfsMethod(getDfs(fs), "setErasureCodingPolicy", path, policy); 186 } 187 188 /** 189 * Unsets any EC policy specified on the path. 190 */ 191 public static void unsetPolicy(FileSystem fs, Path rootDir, TableName tableName) 192 throws IOException { 193 DistributedFileSystem dfs = getDfs(fs); 194 Path path = CommonFSUtils.getTableDir(rootDir, tableName); 195 if (getPolicyNameForPath(dfs, path) == null) { 196 LOG.warn("No EC policy set for path {}, nothing to unset", path); 197 return; 198 } 199 callDfsMethod(dfs, "unsetErasureCodingPolicy", path); 200 } 201 202 public static void enablePolicy(FileSystem fs, String policy) throws IOException { 203 callDfsMethod(getDfs(fs), "enableErasureCodingPolicy", policy); 204 } 205 206 private static DistributedFileSystem getDfs(Configuration conf) throws HBaseIOException { 207 try { 208 return getDfs(FileSystem.get(conf)); 209 } catch (DoNotRetryIOException e) { 210 throw e; 211 } catch (IOException e) { 212 throw new HBaseIOException("Failed to get FileSystem from conf", e); 213 } 214 215 } 216 217 private static DistributedFileSystem getDfs(FileSystem fs) throws DoNotRetryIOException { 218 if (!(fs instanceof DistributedFileSystem)) { 219 throw new DoNotRetryIOException( 220 "Cannot manage Erasure Coding policy. Erasure Coding is only available on HDFS, but fs is " 221 + fs.getClass().getSimpleName()); 222 } 223 return (DistributedFileSystem) fs; 224 } 225 226 public static String getPolicyNameForPath(DistributedFileSystem dfs, Path path) 227 throws IOException { 228 Object policy = callDfsMethod(dfs, "getErasureCodingPolicy", path); 229 if (policy == null) { 230 return null; 231 } 232 return callObjectMethod(policy, "getName"); 233 } 234 235 private interface ThrowingObjectSupplier { 236 Object run() throws IOException; 237 } 238 239 private static <T> T callDfsMethod(DistributedFileSystem dfs, String name, Object... params) 240 throws IOException { 241 return callObjectMethod(dfs, name, params); 242 } 243 244 private static <T> T callObjectMethod(Object object, String name, Object... params) 245 throws IOException { 246 return unwrapInvocationException(() -> ReflectionUtils.invokeMethod(object, name, params)); 247 } 248 249 private static <T> T unwrapInvocationException(ThrowingObjectSupplier runnable) 250 throws IOException { 251 try { 252 return (T) runnable.run(); 253 } catch (UnsupportedOperationException e) { 254 if (e.getCause() instanceof InvocationTargetException) { 255 Throwable cause = e.getCause().getCause(); 256 if (cause instanceof IOException) { 257 throw (IOException) cause; 258 } else if (cause instanceof RuntimeException) { 259 throw (RuntimeException) cause; 260 } 261 } 262 throw e; 263 } 264 } 265}