001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.containsString;
022import static org.hamcrest.Matchers.equalTo;
023import static org.hamcrest.Matchers.greaterThan;
024import static org.hamcrest.Matchers.nullValue;
025import static org.junit.Assert.assertThrows;
026import static org.junit.Assume.assumeTrue;
027
028import java.io.IOException;
029import java.util.function.Function;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.LocatedFileStatus;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.fs.RemoteIterator;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseIOException;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.fs.ErasureCodingUtils;
044import org.apache.hadoop.hbase.io.hfile.HFile;
045import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
046import org.apache.hadoop.hbase.regionserver.HRegion;
047import org.apache.hadoop.hbase.testclassification.MasterTests;
048import org.apache.hadoop.hbase.testclassification.MediumTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.CommonFSUtils;
051import org.apache.hadoop.hbase.util.JVMClusterUtil;
052import org.apache.hadoop.hbase.util.TableDescriptorChecker;
053import org.apache.hadoop.hdfs.DistributedFileSystem;
054import org.junit.AfterClass;
055import org.junit.BeforeClass;
056import org.junit.ClassRule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062@Category({ MasterTests.class, MediumTests.class })
063public class TestManageTableErasureCodingPolicy {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067    HBaseClassTestRule.forClass(TestManageTableErasureCodingPolicy.class);
068  private static final Logger LOG =
069    LoggerFactory.getLogger(TestManageTableErasureCodingPolicy.class);
070
071  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
072  private static final byte[] FAMILY = Bytes.toBytes("a");
073  private static final TableName NON_EC_TABLE = TableName.valueOf("foo");
074  private static final TableDescriptor NON_EC_TABLE_DESC = TableDescriptorBuilder
075    .newBuilder(NON_EC_TABLE).setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
076  private static final TableName EC_TABLE = TableName.valueOf("bar");
077  private static final TableDescriptor EC_TABLE_DESC =
078    TableDescriptorBuilder.newBuilder(EC_TABLE).setErasureCodingPolicy("XOR-2-1-1024k")
079      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
080  private static boolean erasureCodingSupported;
081
082  @BeforeClass
083  public static void beforeClass() throws Exception {
084    // enable because we are testing the checks below
085    UTIL.getConfiguration().setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, true);
086    UTIL.startMiniDFSCluster(3); // 3 necessary for XOR-2-1-1024k
087    UTIL.startMiniCluster(1);
088    DistributedFileSystem fs = (DistributedFileSystem) FileSystem.get(UTIL.getConfiguration());
089
090    erasureCodingSupported = enableErasureCoding(fs);
091
092    Table table = UTIL.createTable(NON_EC_TABLE_DESC, null);
093    UTIL.loadTable(table, FAMILY);
094    UTIL.flush();
095  }
096
097  private static boolean enableErasureCoding(DistributedFileSystem fs) throws IOException {
098    try {
099      ErasureCodingUtils.enablePolicy(fs, "XOR-2-1-1024k");
100      ErasureCodingUtils.enablePolicy(fs, "RS-6-3-1024k");
101      return true;
102    } catch (UnsupportedOperationException e) {
103      LOG.info(
104        "Current hadoop version does not support erasure coding, only validation tests will run.");
105      return false;
106    }
107  }
108
109  @AfterClass
110  public static void afterClass() throws Exception {
111    UTIL.shutdownMiniCluster();
112    UTIL.shutdownMiniDFSCluster();
113  }
114
115  @Test
116  public void itValidatesPolicyNameForCreate() {
117    runValidatePolicyNameTest(unused -> EC_TABLE_DESC, Admin::createTable);
118  }
119
120  @Test
121  public void itValidatesPolicyNameForAlter() {
122    runValidatePolicyNameTest(admin -> {
123      try {
124        return admin.getDescriptor(NON_EC_TABLE);
125      } catch (IOException e) {
126        throw new RuntimeException(e);
127      }
128    }, Admin::modifyTable);
129  }
130
131  @FunctionalInterface
132  interface ThrowingTableDescriptorConsumer {
133    void accept(Admin admin, TableDescriptor desc) throws IOException;
134  }
135
136  private void runValidatePolicyNameTest(Function<Admin, TableDescriptor> descriptorSupplier,
137    ThrowingTableDescriptorConsumer consumer) {
138    HBaseIOException thrown = assertThrows(HBaseIOException.class, () -> {
139      try (Admin admin = UTIL.getAdmin()) {
140        TableDescriptor desc = descriptorSupplier.apply(admin);
141        consumer.accept(admin,
142          TableDescriptorBuilder.newBuilder(desc).setErasureCodingPolicy("foo").build());
143      }
144    });
145    assertPolicyValidationException(thrown.getMessage(),
146      "Cannot set Erasure Coding policy: foo. Policy not found");
147
148    thrown = assertThrows(HBaseIOException.class, () -> {
149      try (Admin admin = UTIL.getAdmin()) {
150        TableDescriptor desc = descriptorSupplier.apply(admin);
151        consumer.accept(admin,
152          TableDescriptorBuilder.newBuilder(desc).setErasureCodingPolicy("RS-10-4-1024k").build());
153      }
154    });
155    assertPolicyValidationException(thrown.getMessage(),
156      "Cannot set Erasure Coding policy: RS-10-4-1024k. The policy must be enabled");
157
158    // RS-6-3-1024k requires at least 6 datanodes, so should fail write test
159    thrown = assertThrows(HBaseIOException.class, () -> {
160      try (Admin admin = UTIL.getAdmin()) {
161        TableDescriptor desc = descriptorSupplier.apply(admin);
162        consumer.accept(admin,
163          TableDescriptorBuilder.newBuilder(desc).setErasureCodingPolicy("RS-6-3-1024k").build());
164      }
165    });
166    assertPolicyValidationException(thrown.getMessage(), "Failed write test for EC policy");
167  }
168
169  private void assertPolicyValidationException(String message, String expected) {
170    if (erasureCodingSupported) {
171      assertThat(message, containsString(expected));
172    } else {
173      assertThat(message, containsString("Cannot find specified method"));
174    }
175  }
176
177  @Test
178  public void testCreateTableErasureCodingSync() throws IOException {
179    assumeTrue(erasureCodingSupported);
180    try (Admin admin = UTIL.getAdmin()) {
181      recreateTable(admin, EC_TABLE_DESC);
182      UTIL.flush(EC_TABLE);
183      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
184      DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(UTIL.getConfiguration());
185      checkRegionDirAndFilePolicies(dfs, rootDir, EC_TABLE, "XOR-2-1-1024k", "XOR-2-1-1024k");
186    }
187  }
188
189  private void recreateTable(Admin admin, TableDescriptor desc) throws IOException {
190    if (admin.tableExists(desc.getTableName())) {
191      admin.disableTable(desc.getTableName());
192      admin.deleteTable(desc.getTableName());
193    }
194    admin.createTable(desc);
195    try (Table table = UTIL.getConnection().getTable(desc.getTableName())) {
196      UTIL.loadTable(table, FAMILY);
197    }
198  }
199
200  @Test
201  public void testModifyTableErasureCodingSync() throws IOException {
202    assumeTrue(erasureCodingSupported);
203    try (Admin admin = UTIL.getAdmin()) {
204      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
205      DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(UTIL.getConfiguration());
206
207      // start off without EC
208      checkRegionDirAndFilePolicies(dfs, rootDir, NON_EC_TABLE, null, null);
209
210      // add EC
211      TableDescriptor desc = UTIL.getAdmin().getDescriptor(NON_EC_TABLE);
212      TableDescriptor newDesc =
213        TableDescriptorBuilder.newBuilder(desc).setErasureCodingPolicy("XOR-2-1-1024k").build();
214      admin.modifyTable(newDesc);
215
216      // check dirs, but files should not be changed yet
217      checkRegionDirAndFilePolicies(dfs, rootDir, NON_EC_TABLE, "XOR-2-1-1024k", null);
218
219      compactAwayOldFiles(NON_EC_TABLE);
220
221      // expect both dirs and files to be EC now
222      checkRegionDirAndFilePolicies(dfs, rootDir, NON_EC_TABLE, "XOR-2-1-1024k", "XOR-2-1-1024k");
223
224      newDesc = TableDescriptorBuilder.newBuilder(newDesc).setErasureCodingPolicy(null).build();
225      // remove EC now
226      admin.modifyTable(newDesc);
227
228      // dirs should no longer be EC, but old EC files remain
229      checkRegionDirAndFilePolicies(dfs, rootDir, NON_EC_TABLE, null, "XOR-2-1-1024k");
230
231      // compact to rewrite EC files without EC, then run discharger to get rid of the old EC files
232      UTIL.compact(NON_EC_TABLE, true);
233      for (JVMClusterUtil.RegionServerThread regionserver : UTIL.getHBaseCluster()
234        .getLiveRegionServerThreads()) {
235        CompactedHFilesDischarger chore =
236          regionserver.getRegionServer().getCompactedHFilesDischarger();
237        chore.setUseExecutor(false);
238        chore.chore();
239      }
240
241      checkRegionDirAndFilePolicies(dfs, rootDir, NON_EC_TABLE, null, null);
242    }
243  }
244
245  private void compactAwayOldFiles(TableName tableName) throws IOException {
246    LOG.info("Compacting and discharging files for {}", tableName);
247    // compact to rewrit files, then run discharger to get rid of the old files
248    UTIL.compact(tableName, true);
249    for (JVMClusterUtil.RegionServerThread regionserver : UTIL.getHBaseCluster()
250      .getLiveRegionServerThreads()) {
251      CompactedHFilesDischarger chore =
252        regionserver.getRegionServer().getCompactedHFilesDischarger();
253      chore.setUseExecutor(false);
254      chore.chore();
255    }
256  }
257
258  @Test
259  public void testRestoreSnapshot() throws IOException {
260    assumeTrue(erasureCodingSupported);
261    String snapshotName = "testRestoreSnapshot_snap";
262    TableName tableName = TableName.valueOf("testRestoreSnapshot_tbl");
263    try (Admin admin = UTIL.getAdmin()) {
264      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
265      DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(UTIL.getConfiguration());
266
267      // recreate EC test table and load it
268      recreateTable(admin, EC_TABLE_DESC);
269
270      // Take a snapshot, then clone it into a new table
271      admin.snapshot(snapshotName, EC_TABLE);
272      admin.cloneSnapshot(snapshotName, tableName);
273      compactAwayOldFiles(tableName);
274
275      // Verify the new table has the right EC policy
276      checkRegionDirAndFilePolicies(dfs, rootDir, tableName, "XOR-2-1-1024k", "XOR-2-1-1024k");
277
278      // Remove the EC policy from the EC test table, and verify that worked
279      admin.modifyTable(
280        TableDescriptorBuilder.newBuilder(EC_TABLE_DESC).setErasureCodingPolicy(null).build());
281      compactAwayOldFiles(EC_TABLE);
282      checkRegionDirAndFilePolicies(dfs, rootDir, EC_TABLE, null, null);
283
284      // Restore snapshot, and then verify it has the policy again
285      admin.disableTable(EC_TABLE);
286      admin.restoreSnapshot(snapshotName);
287      admin.enableTable(EC_TABLE);
288      compactAwayOldFiles(EC_TABLE);
289      checkRegionDirAndFilePolicies(dfs, rootDir, EC_TABLE, "XOR-2-1-1024k", "XOR-2-1-1024k");
290    }
291  }
292
293  private void checkRegionDirAndFilePolicies(DistributedFileSystem dfs, Path rootDir,
294    TableName testTable, String expectedDirPolicy, String expectedFilePolicy) throws IOException {
295    Path tableDir = CommonFSUtils.getTableDir(rootDir, testTable);
296    checkPolicy(dfs, tableDir, expectedDirPolicy);
297
298    int filesMatched = 0;
299    for (HRegion region : UTIL.getHBaseCluster().getRegions(testTable)) {
300      Path regionDir = new Path(tableDir, region.getRegionInfo().getEncodedName());
301      checkPolicy(dfs, regionDir, expectedDirPolicy);
302      RemoteIterator<LocatedFileStatus> itr = dfs.listFiles(regionDir, true);
303      while (itr.hasNext()) {
304        LocatedFileStatus fileStatus = itr.next();
305        Path path = fileStatus.getPath();
306        if (!HFile.isHFileFormat(dfs, path)) {
307          LOG.info("{} is not an hfile", path);
308          continue;
309        }
310        filesMatched++;
311        checkPolicy(dfs, path, expectedFilePolicy);
312      }
313    }
314    assertThat(filesMatched, greaterThan(0));
315  }
316
317  private void checkPolicy(DistributedFileSystem dfs, Path path, String expectedPolicy)
318    throws IOException {
319    String policy = ErasureCodingUtils.getPolicyNameForPath(dfs, path);
320    if (expectedPolicy == null) {
321      assertThat("policy for " + path, policy, nullValue());
322    } else {
323      assertThat("policy for " + path, policy, equalTo(expectedPolicy));
324    }
325  }
326}