001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.Matchers.containsString;
023import static org.hamcrest.Matchers.everyItem;
024import static org.hamcrest.Matchers.hasItem;
025import static org.hamcrest.Matchers.hasProperty;
026import static org.hamcrest.Matchers.not;
027import static org.junit.Assert.assertTrue;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.UUID;
033import java.util.concurrent.TimeUnit;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.FileUtil;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.TableNameTestRule;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.RegionInfoBuilder;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.client.TableDescriptor;
048import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
049import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
050import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerForTest;
051import org.apache.hadoop.hbase.testclassification.LargeTests;
052import org.apache.hadoop.hbase.testclassification.RegionServerTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.hadoop.hbase.util.Pair;
056import org.junit.AfterClass;
057import org.junit.Before;
058import org.junit.BeforeClass;
059import org.junit.ClassRule;
060import org.junit.Rule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063
064@Category({ RegionServerTests.class, LargeTests.class })
065public class TestMergesSplitsAddToTracker {
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestMergesSplitsAddToTracker.class);
070
071  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
072
073  private static final String FAMILY_NAME_STR = "info";
074
075  private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
076
077  @Rule
078  public TableNameTestRule name = new TableNameTestRule();
079
080  @BeforeClass
081  public static void setupClass() throws Exception {
082    TEST_UTIL.startMiniCluster();
083  }
084
085  @AfterClass
086  public static void afterClass() throws Exception {
087    TEST_UTIL.shutdownMiniCluster();
088  }
089
090  @Before
091  public void setup() {
092    StoreFileTrackerForTest.clear();
093  }
094
095  private TableName createTable(byte[] splitKey) throws IOException {
096    TableDescriptor td = TableDescriptorBuilder.newBuilder(name.getTableName())
097      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_NAME))
098      .setValue(TRACKER_IMPL, StoreFileTrackerForTest.class.getName()).build();
099    if (splitKey != null) {
100      TEST_UTIL.getAdmin().createTable(td, new byte[][] { splitKey });
101    } else {
102      TEST_UTIL.getAdmin().createTable(td);
103    }
104    return td.getTableName();
105  }
106
107  @Test
108  public void testCommitDaughterRegion() throws Exception {
109    TableName table = createTable(null);
110    // first put some data in order to have a store file created
111    putThreeRowsAndFlush(table);
112    HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
113    HRegionFileSystem regionFS = region.getStores().get(0).getRegionFileSystem();
114    RegionInfo daughterA =
115      RegionInfoBuilder.newBuilder(table).setStartKey(region.getRegionInfo().getStartKey())
116        .setEndKey(Bytes.toBytes("002")).setSplit(false)
117        .setRegionId(region.getRegionInfo().getRegionId() + EnvironmentEdgeManager.currentTime())
118        .build();
119    RegionInfo daughterB = RegionInfoBuilder.newBuilder(table).setStartKey(Bytes.toBytes("002"))
120      .setEndKey(region.getRegionInfo().getEndKey()).setSplit(false)
121      .setRegionId(region.getRegionInfo().getRegionId()).build();
122    HStoreFile file = (HStoreFile) region.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
123    List<Path> splitFilesA = new ArrayList<>();
124    splitFilesA.add(regionFS.splitStoreFile(daughterA, Bytes.toString(FAMILY_NAME), file,
125      Bytes.toBytes("002"), false, region.getSplitPolicy()));
126    List<Path> splitFilesB = new ArrayList<>();
127    splitFilesB.add(regionFS.splitStoreFile(daughterB, Bytes.toString(FAMILY_NAME), file,
128      Bytes.toBytes("002"), true, region.getSplitPolicy()));
129    MasterProcedureEnv env =
130      TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getEnvironment();
131    Path resultA = regionFS.commitDaughterRegion(daughterA, splitFilesA, env);
132    Path resultB = regionFS.commitDaughterRegion(daughterB, splitFilesB, env);
133    FileSystem fs = regionFS.getFileSystem();
134    verifyFilesAreTracked(resultA, fs);
135    verifyFilesAreTracked(resultB, fs);
136  }
137
138  @Test
139  public void testCommitMergedRegion() throws Exception {
140    TableName table = createTable(null);
141    // splitting the table first
142    split(table, Bytes.toBytes("002"));
143    // Add data and flush to create files in the two different regions
144    putThreeRowsAndFlush(table);
145    List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
146    HRegion first = regions.get(0);
147    HRegion second = regions.get(1);
148    HRegionFileSystem regionFS = first.getRegionFileSystem();
149
150    RegionInfo mergeResult =
151      RegionInfoBuilder.newBuilder(table).setStartKey(first.getRegionInfo().getStartKey())
152        .setEndKey(second.getRegionInfo().getEndKey()).setSplit(false)
153        .setRegionId(first.getRegionInfo().getRegionId() + EnvironmentEdgeManager.currentTime())
154        .build();
155
156    HRegionFileSystem mergeFS = HRegionFileSystem.createRegionOnFileSystem(
157      TEST_UTIL.getHBaseCluster().getMaster().getConfiguration(), regionFS.getFileSystem(),
158      regionFS.getTableDir(), mergeResult);
159
160    List<Path> mergedFiles = new ArrayList<>();
161    // merge file from first region
162    mergedFiles.add(mergeFileFromRegion(first, mergeFS));
163    // merge file from second region
164    mergedFiles.add(mergeFileFromRegion(second, mergeFS));
165    MasterProcedureEnv env =
166      TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getEnvironment();
167    mergeFS.commitMergedRegion(mergedFiles, env);
168    // validate
169    FileSystem fs = first.getRegionFileSystem().getFileSystem();
170    Path finalMergeDir =
171      new Path(first.getRegionFileSystem().getTableDir(), mergeResult.getEncodedName());
172    verifyFilesAreTracked(finalMergeDir, fs);
173  }
174
175  @Test
176  public void testSplitLoadsFromTracker() throws Exception {
177    TableName table = createTable(null);
178    // Add data and flush to create files in the two different regions
179    putThreeRowsAndFlush(table);
180    HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
181    Pair<StoreFileInfo, String> copyResult = copyFileInTheStoreDir(region);
182    StoreFileInfo fileInfo = copyResult.getFirst();
183    String copyName = copyResult.getSecond();
184    // Now splits the region
185    split(table, Bytes.toBytes("002"));
186    List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
187    HRegion first = regions.get(0);
188    validateDaughterRegionsFiles(first, fileInfo.getActiveFileName(), copyName);
189    HRegion second = regions.get(1);
190    validateDaughterRegionsFiles(second, fileInfo.getActiveFileName(), copyName);
191  }
192
193  private void split(TableName table, byte[] splitKey) throws IOException {
194    TEST_UTIL.getAdmin().split(table, splitKey);
195    // wait until split is done
196    TEST_UTIL.waitFor(30000, () -> TEST_UTIL.getHBaseCluster().getRegions(table).size() == 2);
197  }
198
199  @Test
200  public void testMergeLoadsFromTracker() throws Exception {
201    TableName table = createTable(Bytes.toBytes("002"));
202    // Add data and flush to create files in the two different regions
203    putThreeRowsAndFlush(table);
204    List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
205    HRegion first = regions.get(0);
206    Pair<StoreFileInfo, String> copyResult = copyFileInTheStoreDir(first);
207    StoreFileInfo fileInfo = copyResult.getFirst();
208    String copyName = copyResult.getSecond();
209    // Now merges the first two regions
210    TEST_UTIL.getAdmin()
211      .mergeRegionsAsync(new byte[][] { first.getRegionInfo().getEncodedNameAsBytes(),
212        regions.get(1).getRegionInfo().getEncodedNameAsBytes() }, true)
213      .get(10, TimeUnit.SECONDS);
214    regions = TEST_UTIL.getHBaseCluster().getRegions(table);
215    HRegion merged = regions.get(0);
216    validateDaughterRegionsFiles(merged, fileInfo.getActiveFileName(), copyName);
217  }
218
219  private Pair<StoreFileInfo, String> copyFileInTheStoreDir(HRegion region) throws IOException {
220    Path storeDir = region.getRegionFileSystem().getStoreDir("info");
221    // gets the single file
222    StoreFileInfo fileInfo = region.getRegionFileSystem().getStoreFiles("info").get(0);
223    // make a copy of the valid file staight into the store dir, so that it's not tracked.
224    String copyName = UUID.randomUUID().toString().replaceAll("-", "");
225    Path copy = new Path(storeDir, copyName);
226    FileUtil.copy(region.getFilesystem(), fileInfo.getFileStatus(), region.getFilesystem(), copy,
227      false, false, TEST_UTIL.getConfiguration());
228    return new Pair<>(fileInfo, copyName);
229  }
230
231  private void validateDaughterRegionsFiles(HRegion region, String originalFileName,
232    String untrackedFile) throws IOException {
233    // verify there's no link for the untracked, copied file in first region
234    List<StoreFileInfo> infos = region.getRegionFileSystem().getStoreFiles("info");
235    assertThat(infos, everyItem(hasProperty("activeFileName", not(containsString(untrackedFile)))));
236    assertThat(infos, hasItem(hasProperty("activeFileName", containsString(originalFileName))));
237  }
238
239  private void verifyFilesAreTracked(Path regionDir, FileSystem fs) throws Exception {
240    for (FileStatus f : fs.listStatus(new Path(regionDir, FAMILY_NAME_STR))) {
241      assertTrue(
242        StoreFileTrackerForTest.tracked(regionDir.getName(), FAMILY_NAME_STR, f.getPath()));
243    }
244  }
245
246  private Path mergeFileFromRegion(HRegion regionToMerge, HRegionFileSystem mergeFS)
247    throws IOException {
248    HStoreFile file = (HStoreFile) regionToMerge.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
249    return mergeFS.mergeStoreFile(regionToMerge.getRegionInfo(), Bytes.toString(FAMILY_NAME), file);
250  }
251
252  private void putThreeRowsAndFlush(TableName table) throws IOException {
253    Table tbl = TEST_UTIL.getConnection().getTable(table);
254    Put put = new Put(Bytes.toBytes("001"));
255    byte[] qualifier = Bytes.toBytes("1");
256    put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(1));
257    tbl.put(put);
258    put = new Put(Bytes.toBytes("002"));
259    put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(2));
260    tbl.put(put);
261    put = new Put(Bytes.toBytes("003"));
262    put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(2));
263    tbl.put(put);
264    TEST_UTIL.flush(table);
265  }
266}