001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.List;
028import java.util.Optional;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Durability;
040import org.apache.hadoop.hbase.client.Put;
041import org.apache.hadoop.hbase.client.RegionInfo;
042import org.apache.hadoop.hbase.client.RegionInfoBuilder;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.client.TableDescriptor;
045import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
046import org.apache.hadoop.hbase.regionserver.CellSink;
047import org.apache.hadoop.hbase.regionserver.HMobStore;
048import org.apache.hadoop.hbase.regionserver.HRegion;
049import org.apache.hadoop.hbase.regionserver.HStore;
050import org.apache.hadoop.hbase.regionserver.InternalScanner;
051import org.apache.hadoop.hbase.regionserver.RegionAsTable;
052import org.apache.hadoop.hbase.regionserver.ScannerContext;
053import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
054import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
055import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
056import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
057import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
058import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
059import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
060import org.apache.hadoop.hbase.security.User;
061import org.apache.hadoop.hbase.testclassification.MediumTests;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.junit.After;
064import org.junit.BeforeClass;
065import org.junit.ClassRule;
066import org.junit.Rule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.junit.rules.TestName;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073@Category(MediumTests.class)
074public class TestMobCompactionWithException {
075  @ClassRule
076  public static final HBaseClassTestRule CLASS_RULE =
077    HBaseClassTestRule.forClass(TestMobCompactionWithException.class);
078
079  @Rule
080  public TestName name = new TestName();
081  static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionWithException.class.getName());
082  private final static HBaseTestingUtility HTU = new HBaseTestingUtility();
083  private static Configuration conf = null;
084
085  private HRegion region = null;
086  private TableDescriptor tableDescriptor;
087  private ColumnFamilyDescriptor columnFamilyDescriptor;
088  private FileSystem fs;
089
090  private static final byte[] COLUMN_FAMILY = fam1;
091  private final byte[] STARTROW = Bytes.toBytes(START_KEY);
092  private static volatile boolean testException = false;
093  private static int rowCount = 100;
094  private Table table;
095
096  @BeforeClass
097  public static void setUp() throws Exception {
098    conf = HTU.getConfiguration();
099    conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
100    conf.set(MobStoreEngine.MOB_COMPACTOR_CLASS_KEY, MyMobStoreCompactor.class.getName());
101
102  }
103
104  @After
105  public void tearDown() throws Exception {
106    region.close();
107    this.table.close();
108    fs.delete(HTU.getDataTestDir(), true);
109  }
110
111  private void createTable(long mobThreshold) throws IOException {
112
113    this.columnFamilyDescriptor =
114      ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_FAMILY).setMobEnabled(true)
115        .setMobThreshold(mobThreshold).setMaxVersions(1).setBlocksize(500).build();
116    this.tableDescriptor =
117      TableDescriptorBuilder.newBuilder(TableName.valueOf(TestMobUtils.getTableName(name)))
118        .setColumnFamily(columnFamilyDescriptor).build();
119    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
120    region = HBaseTestingUtility.createRegionAndWAL(regionInfo, HTU.getDataTestDir(), conf,
121      tableDescriptor, new MobFileCache(conf));
122    this.table = new RegionAsTable(region);
123    fs = FileSystem.get(conf);
124  }
125
126  /**
127   * This test is for HBASE-27433.
128   */
129  @Test
130  public void testMobStoreFileDeletedWhenCompactException() throws Exception {
131    this.createTable(200);
132    byte[] dummyData = makeDummyData(1000); // larger than mob threshold
133    for (int i = 0; i < rowCount; i++) {
134      Put p = createPut(i, dummyData);
135      table.put(p);
136      region.flush(true);
137    }
138
139    int storeFileCountBeforeCompact = countStoreFiles();
140    int mobFileCountBeforeCompact = countMobFiles();
141    long mobFileByteSize = getMobFileByteSize();
142
143    List<HStore> stores = region.getStores();
144    assertTrue(stores.size() == 1);
145    HMobStore mobStore = (HMobStore) stores.get(0);
146    Compactor<?> compactor = mobStore.getStoreEngine().getCompactor();
147    MyMobStoreCompactor myMobStoreCompactor = (MyMobStoreCompactor) compactor;
148    myMobStoreCompactor.setMobFileMaxByteSize(mobFileByteSize + 100);
149    testException = true;
150    try {
151      try {
152
153        // Force major compaction
154        mobStore.triggerMajorCompaction();
155        Optional<CompactionContext> context = mobStore.requestCompaction(HStore.PRIORITY_USER,
156          CompactionLifeCycleTracker.DUMMY, User.getCurrent());
157        assertTrue(context.isPresent());
158        region.compact(context.get(), mobStore, NoLimitThroughputController.INSTANCE,
159          User.getCurrent());
160
161        fail();
162      } catch (IOException e) {
163        assertTrue(e != null);
164      }
165    } finally {
166      testException = false;
167    }
168
169    // When compaction is failed,the count of StoreFile and MobStoreFile should be the same as
170    // before compaction.
171    assertEquals("After compaction: store files", storeFileCountBeforeCompact, countStoreFiles());
172    assertEquals("After compaction: mob file count", mobFileCountBeforeCompact, countMobFiles());
173  }
174
175  private int countStoreFiles() throws IOException {
176    HStore store = region.getStore(COLUMN_FAMILY);
177    return store.getStorefilesCount();
178  }
179
180  private int countMobFiles() throws IOException {
181    Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(),
182      columnFamilyDescriptor.getNameAsString());
183    if (fs.exists(mobDirPath)) {
184      FileStatus[] files = HTU.getTestFileSystem().listStatus(mobDirPath);
185      return files.length;
186    }
187    return 0;
188  }
189
190  private long getMobFileByteSize() throws IOException {
191    Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(),
192      columnFamilyDescriptor.getNameAsString());
193    if (fs.exists(mobDirPath)) {
194      FileStatus[] files = HTU.getTestFileSystem().listStatus(mobDirPath);
195      if (files.length > 0) {
196        return files[0].getLen();
197      }
198    }
199    return 0;
200  }
201
202  private Put createPut(int rowIdx, byte[] dummyData) throws IOException {
203    Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)));
204    p.setDurability(Durability.SKIP_WAL);
205    p.addColumn(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
206    return p;
207  }
208
209  private byte[] makeDummyData(int size) {
210    byte[] dummyData = new byte[size];
211    Bytes.random(dummyData);
212    return dummyData;
213  }
214
215  public static class MyMobStoreCompactor extends DefaultMobStoreCompactor {
216    public MyMobStoreCompactor(Configuration conf, HStore store) {
217      super(conf, store);
218
219    }
220
221    public void setMobFileMaxByteSize(long maxByteSize) {
222      this.conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, maxByteSize);
223    }
224
225    @Override
226    protected boolean performCompaction(FileDetails fd, final InternalScanner scanner,
227      CellSink writer, long smallestReadPoint, boolean cleanSeqId,
228      ThroughputController throughputController, CompactionRequestImpl request,
229      CompactionProgress progress) throws IOException {
230
231      InternalScanner wrappedScanner = new InternalScanner() {
232
233        private int count = -1;
234
235        @Override
236        public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
237          count++;
238          if (count == rowCount - 1 && testException) {
239            count = 0;
240            throw new IOException("Inject Error");
241          }
242          return scanner.next(result, scannerContext);
243        }
244
245        @Override
246        public void close() throws IOException {
247          scanner.close();
248        }
249      };
250      return super.performCompaction(fd, wrappedScanner, writer, smallestReadPoint, cleanSeqId,
251        throughputController, request, progress);
252    }
253  }
254}