001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.Matchers.containsString;
023import static org.hamcrest.Matchers.everyItem;
024import static org.hamcrest.Matchers.hasItem;
025import static org.hamcrest.Matchers.hasProperty;
026import static org.hamcrest.Matchers.not;
027import static org.junit.Assert.assertTrue;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.UUID;
033import java.util.concurrent.TimeUnit;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.FileUtil;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.TableNameTestRule;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.RegionInfoBuilder;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.client.TableDescriptor;
048import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
049import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
050import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
051import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
052import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerForTest;
053import org.apache.hadoop.hbase.testclassification.LargeTests;
054import org.apache.hadoop.hbase.testclassification.RegionServerTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.apache.hadoop.hbase.util.Pair;
058import org.junit.AfterClass;
059import org.junit.Before;
060import org.junit.BeforeClass;
061import org.junit.ClassRule;
062import org.junit.Rule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065
066@Category({ RegionServerTests.class, LargeTests.class })
067public class TestMergesSplitsAddToTracker {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestMergesSplitsAddToTracker.class);
072
073  private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
074
075  private static final String FAMILY_NAME_STR = "info";
076
077  private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
078
079  @Rule
080  public TableNameTestRule name = new TableNameTestRule();
081
082  @BeforeClass
083  public static void setupClass() throws Exception {
084    TEST_UTIL.startMiniCluster();
085  }
086
087  @AfterClass
088  public static void afterClass() throws Exception {
089    TEST_UTIL.shutdownMiniCluster();
090  }
091
092  @Before
093  public void setup() {
094    StoreFileTrackerForTest.clear();
095  }
096
097  private TableName createTable(byte[] splitKey) throws IOException {
098    TableDescriptor td = TableDescriptorBuilder.newBuilder(name.getTableName())
099      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_NAME))
100      .setValue(TRACKER_IMPL, StoreFileTrackerForTest.class.getName()).build();
101    if (splitKey != null) {
102      TEST_UTIL.getAdmin().createTable(td, new byte[][] { splitKey });
103    } else {
104      TEST_UTIL.getAdmin().createTable(td);
105    }
106    return td.getTableName();
107  }
108
109  @Test
110  public void testCommitDaughterRegion() throws Exception {
111    TableName table = createTable(null);
112    // first put some data in order to have a store file created
113    putThreeRowsAndFlush(table);
114    HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
115    HRegionFileSystem regionFS = region.getStores().get(0).getRegionFileSystem();
116    RegionInfo daughterA =
117      RegionInfoBuilder.newBuilder(table).setStartKey(region.getRegionInfo().getStartKey())
118        .setEndKey(Bytes.toBytes("002")).setSplit(false)
119        .setRegionId(region.getRegionInfo().getRegionId() + EnvironmentEdgeManager.currentTime())
120        .build();
121    RegionInfo daughterB = RegionInfoBuilder.newBuilder(table).setStartKey(Bytes.toBytes("002"))
122      .setEndKey(region.getRegionInfo().getEndKey()).setSplit(false)
123      .setRegionId(region.getRegionInfo().getRegionId()).build();
124    HStoreFile file = (HStoreFile) region.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
125    List<Path> splitFilesA = new ArrayList<>();
126    HRegionFileSystem regionFs = region.getRegionFileSystem();
127    StoreFileTracker sft = StoreFileTrackerFactory.create(region.getBaseConf(), true,
128      StoreContext.getBuilder()
129        .withFamilyStoreDirectoryPath(new Path(regionFs.getRegionDir(), "info"))
130        .withRegionFileSystem(regionFs).build());
131    splitFilesA.add(regionFS.splitStoreFile(daughterA, Bytes.toString(FAMILY_NAME), file,
132      Bytes.toBytes("002"), false, region.getSplitPolicy(), sft));
133    List<Path> splitFilesB = new ArrayList<>();
134    splitFilesB.add(regionFS.splitStoreFile(daughterB, Bytes.toString(FAMILY_NAME), file,
135      Bytes.toBytes("002"), true, region.getSplitPolicy(), sft));
136    MasterProcedureEnv env =
137      TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getEnvironment();
138    Path resultA = regionFS.commitDaughterRegion(daughterA, splitFilesA, env);
139    Path resultB = regionFS.commitDaughterRegion(daughterB, splitFilesB, env);
140    FileSystem fs = regionFS.getFileSystem();
141    verifyFilesAreTracked(resultA, fs);
142    verifyFilesAreTracked(resultB, fs);
143  }
144
145  @Test
146  public void testCommitMergedRegion() throws Exception {
147    TableName table = createTable(null);
148    // splitting the table first
149    TEST_UTIL.getAdmin().split(table, Bytes.toBytes("002"));
150    // Add data and flush to create files in the two different regions
151    putThreeRowsAndFlush(table);
152    List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
153    HRegion first = regions.get(0);
154    HRegion second = regions.get(1);
155    HRegionFileSystem regionFS = first.getRegionFileSystem();
156
157    RegionInfo mergeResult =
158      RegionInfoBuilder.newBuilder(table).setStartKey(first.getRegionInfo().getStartKey())
159        .setEndKey(second.getRegionInfo().getEndKey()).setSplit(false)
160        .setRegionId(first.getRegionInfo().getRegionId() + EnvironmentEdgeManager.currentTime())
161        .build();
162
163    HRegionFileSystem mergeFS = HRegionFileSystem.createRegionOnFileSystem(
164      TEST_UTIL.getHBaseCluster().getMaster().getConfiguration(), regionFS.getFileSystem(),
165      regionFS.getTableDir(), mergeResult);
166
167    List<Path> mergedFiles = new ArrayList<>();
168    // merge file from first region
169    mergedFiles.add(mergeFileFromRegion(first, mergeFS));
170    // merge file from second region
171    mergedFiles.add(mergeFileFromRegion(second, mergeFS));
172    MasterProcedureEnv env =
173      TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getEnvironment();
174    mergeFS.commitMergedRegion(mergedFiles, env);
175    // validate
176    FileSystem fs = first.getRegionFileSystem().getFileSystem();
177    Path finalMergeDir =
178      new Path(first.getRegionFileSystem().getTableDir(), mergeResult.getEncodedName());
179    verifyFilesAreTracked(finalMergeDir, fs);
180  }
181
182  @Test
183  public void testSplitLoadsFromTracker() throws Exception {
184    TableName table = createTable(null);
185    // Add data and flush to create files in the two different regions
186    putThreeRowsAndFlush(table);
187    HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
188    Pair<StoreFileInfo, String> copyResult = copyFileInTheStoreDir(region);
189    StoreFileInfo fileInfo = copyResult.getFirst();
190    String copyName = copyResult.getSecond();
191    // Now splits the region
192    TEST_UTIL.getAdmin().split(table, Bytes.toBytes("002"));
193    List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
194    HRegion first = regions.get(0);
195    validateDaughterRegionsFiles(first, fileInfo.getActiveFileName(), copyName);
196    HRegion second = regions.get(1);
197    validateDaughterRegionsFiles(second, fileInfo.getActiveFileName(), copyName);
198  }
199
200  @Test
201  public void testMergeLoadsFromTracker() throws Exception {
202    TableName table = createTable(Bytes.toBytes("002"));
203    // Add data and flush to create files in the two different regions
204    putThreeRowsAndFlush(table);
205    List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
206    HRegion first = regions.get(0);
207    Pair<StoreFileInfo, String> copyResult = copyFileInTheStoreDir(first);
208    StoreFileInfo fileInfo = copyResult.getFirst();
209    String copyName = copyResult.getSecond();
210    // Now merges the first two regions
211    TEST_UTIL.getAdmin()
212      .mergeRegionsAsync(new byte[][] { first.getRegionInfo().getEncodedNameAsBytes(),
213        regions.get(1).getRegionInfo().getEncodedNameAsBytes() }, true)
214      .get(10, TimeUnit.SECONDS);
215    regions = TEST_UTIL.getHBaseCluster().getRegions(table);
216    HRegion merged = regions.get(0);
217    validateDaughterRegionsFiles(merged, fileInfo.getActiveFileName(), copyName);
218  }
219
220  private Pair<StoreFileInfo, String> copyFileInTheStoreDir(HRegion region) throws IOException {
221    Path storeDir = region.getRegionFileSystem().getStoreDir("info");
222    // gets the single file
223    HRegionFileSystem regionFs = region.getRegionFileSystem();
224    StoreFileTracker sft = StoreFileTrackerFactory.create(region.getBaseConf(), false,
225      StoreContext.getBuilder().withFamilyStoreDirectoryPath(storeDir)
226        .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of(FAMILY_NAME))
227        .withRegionFileSystem(regionFs).build());
228    List<StoreFileInfo> infos = sft.load();
229    StoreFileInfo fileInfo = infos.get(0);
230    // make a copy of the valid file staight into the store dir, so that it's not tracked.
231    String copyName = UUID.randomUUID().toString().replaceAll("-", "");
232    Path copy = new Path(storeDir, copyName);
233    FileUtil.copy(region.getFilesystem(), fileInfo.getFileStatus(), region.getFilesystem(), copy,
234      false, false, TEST_UTIL.getConfiguration());
235    return new Pair<>(fileInfo, copyName);
236  }
237
238  private void validateDaughterRegionsFiles(HRegion region, String originalFileName,
239    String untrackedFile) throws IOException {
240    // verify there's no link for the untracked, copied file in first region
241    HRegionFileSystem regionFs = region.getRegionFileSystem();
242    StoreFileTracker sft = StoreFileTrackerFactory.create(regionFs.getFileSystem().getConf(), false,
243      StoreContext.getBuilder()
244        .withFamilyStoreDirectoryPath(new Path(regionFs.getRegionDir(), "info"))
245        .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of(FAMILY_NAME))
246        .withRegionFileSystem(regionFs).build());
247    List<StoreFileInfo> infos = sft.load();
248    assertThat(infos, everyItem(hasProperty("activeFileName", not(containsString(untrackedFile)))));
249    assertThat(infos, hasItem(hasProperty("activeFileName", containsString(originalFileName))));
250  }
251
252  private void verifyFilesAreTracked(Path regionDir, FileSystem fs) throws Exception {
253    for (FileStatus f : fs.listStatus(new Path(regionDir, FAMILY_NAME_STR))) {
254      assertTrue(
255        StoreFileTrackerForTest.tracked(regionDir.getName(), FAMILY_NAME_STR, f.getPath()));
256    }
257  }
258
259  private Path mergeFileFromRegion(HRegion regionToMerge, HRegionFileSystem mergeFS)
260    throws IOException {
261    HStoreFile file = (HStoreFile) regionToMerge.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
262    HRegionFileSystem regionFs = regionToMerge.getRegionFileSystem();
263    StoreFileTracker sft = StoreFileTrackerFactory.create(regionToMerge.getBaseConf(), true,
264      StoreContext.getBuilder()
265        .withFamilyStoreDirectoryPath(new Path(regionFs.getRegionDir(), FAMILY_NAME_STR))
266        .withRegionFileSystem(regionFs).build());
267    return mergeFS.mergeStoreFile(regionToMerge.getRegionInfo(), Bytes.toString(FAMILY_NAME), file,
268      sft);
269  }
270
271  private void putThreeRowsAndFlush(TableName table) throws IOException {
272    Table tbl = TEST_UTIL.getConnection().getTable(table);
273    Put put = new Put(Bytes.toBytes("001"));
274    byte[] qualifier = Bytes.toBytes("1");
275    put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(1));
276    tbl.put(put);
277    put = new Put(Bytes.toBytes("002"));
278    put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(2));
279    tbl.put(put);
280    put = new Put(Bytes.toBytes("003"));
281    put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(2));
282    tbl.put(put);
283    TEST_UTIL.flush(table);
284  }
285}