001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.core.Is.is;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.List;
030import java.util.Map;
031import java.util.Optional;
032import java.util.TreeMap;
033import java.util.concurrent.atomic.AtomicLong;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.ExtendedCell;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HBaseTestingUtil;
042import org.apache.hadoop.hbase.HRegionLocation;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.KeyValueUtil;
045import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
046import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
047import org.apache.hadoop.hbase.StartTestingClusterOption;
048import org.apache.hadoop.hbase.TableExistsException;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
052import org.apache.hadoop.hbase.client.Connection;
053import org.apache.hadoop.hbase.client.RegionInfo;
054import org.apache.hadoop.hbase.client.RegionLocator;
055import org.apache.hadoop.hbase.client.Result;
056import org.apache.hadoop.hbase.client.ResultScanner;
057import org.apache.hadoop.hbase.client.Scan;
058import org.apache.hadoop.hbase.client.Table;
059import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
060import org.apache.hadoop.hbase.coprocessor.ObserverContext;
061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
062import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
063import org.apache.hadoop.hbase.coprocessor.RegionObserver;
064import org.apache.hadoop.hbase.io.compress.Compression;
065import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
066import org.apache.hadoop.hbase.io.hfile.CacheConfig;
067import org.apache.hadoop.hbase.io.hfile.HFile;
068import org.apache.hadoop.hbase.io.hfile.HFileContext;
069import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
070import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
071import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
072import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
073import org.apache.hadoop.hbase.testclassification.LargeTests;
074import org.apache.hadoop.hbase.testclassification.RegionServerTests;
075import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
076import org.apache.hadoop.hbase.util.Bytes;
077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
078import org.apache.hadoop.hbase.wal.WAL;
079import org.apache.hadoop.hbase.wal.WALEdit;
080import org.apache.hadoop.hbase.wal.WALKey;
081import org.junit.BeforeClass;
082import org.junit.ClassRule;
083import org.junit.Test;
084import org.junit.experimental.categories.Category;
085import org.junit.runner.RunWith;
086import org.junit.runners.Parameterized;
087import org.junit.runners.Parameterized.Parameters;
088import org.slf4j.Logger;
089import org.slf4j.LoggerFactory;
090
091import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
092
093/**
094 * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of the region server's
095 * bullkLoad functionality.
096 */
097@RunWith(Parameterized.class)
098@Category({ RegionServerTests.class, LargeTests.class })
099public class TestHRegionServerBulkLoad {
100
101  @ClassRule
102  public static final HBaseClassTestRule CLASS_RULE =
103    HBaseClassTestRule.forClass(TestHRegionServerBulkLoad.class);
104
105  private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class);
106  protected static HBaseTestingUtil UTIL = new HBaseTestingUtil();
107  protected final static Configuration conf = UTIL.getConfiguration();
108  protected final static byte[] QUAL = Bytes.toBytes("qual");
109  protected final static int NUM_CFS = 10;
110  private int sleepDuration;
111  public static int BLOCKSIZE = 64 * 1024;
112  public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
113
114  protected final static byte[][] families = new byte[NUM_CFS][];
115  static {
116    for (int i = 0; i < NUM_CFS; i++) {
117      families[i] = Bytes.toBytes(family(i));
118    }
119  }
120
121  @Parameters
122  public static final Collection<Object[]> parameters() {
123    int[] sleepDurations = new int[] { 0, 30000 };
124    List<Object[]> configurations = new ArrayList<>();
125    for (int i : sleepDurations) {
126      configurations.add(new Object[] { i });
127    }
128    return configurations;
129  }
130
131  public TestHRegionServerBulkLoad(int duration) {
132    this.sleepDuration = duration;
133  }
134
135  @BeforeClass
136  public static void setUpBeforeClass() throws Exception {
137    conf.setInt("hbase.rpc.timeout", 10 * 1000);
138  }
139
140  /**
141   * Create a rowkey compatible with
142   * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}.
143   */
144  public static byte[] rowkey(int i) {
145    return Bytes.toBytes(String.format("row_%08d", i));
146  }
147
148  static String family(int i) {
149    return String.format("family_%04d", i);
150  }
151
152  /**
153   * Create an HFile with the given number of rows with a specified value.
154   */
155  public static void createHFile(FileSystem fs, Path path, byte[] family, byte[] qualifier,
156    byte[] value, int numRows) throws IOException {
157    HFileContext context =
158      new HFileContextBuilder().withBlockSize(BLOCKSIZE).withCompression(COMPRESSION).build();
159    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
160      .withFileContext(context).create();
161    long now = EnvironmentEdgeManager.currentTime();
162    try {
163      // subtract 2 since iterateOnSplits doesn't include boundary keys
164      for (int i = 0; i < numRows; i++) {
165        KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
166        writer.append(kv);
167      }
168      writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(now));
169    } finally {
170      writer.close();
171    }
172  }
173
174  /**
175   * Thread that does full scans of the table looking for any partially completed rows. Each
176   * iteration of this loads 10 hdfs files, which occupies 5 file open file handles. So every 10
177   * iterations (500 file handles) it does a region compaction to reduce the number of open file
178   * handles.
179   */
180  public static class AtomicHFileLoader extends RepeatingTestThread {
181    final AtomicLong numBulkLoads = new AtomicLong();
182    final AtomicLong numCompactions = new AtomicLong();
183    private TableName tableName;
184
185    public AtomicHFileLoader(TableName tableName, TestContext ctx, byte targetFamilies[][])
186      throws IOException {
187      super(ctx);
188      this.tableName = tableName;
189    }
190
191    @Override
192    public void doAnAction() throws Exception {
193      long iteration = numBulkLoads.getAndIncrement();
194      Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", iteration));
195
196      // create HFiles for different column families
197      FileSystem fs = UTIL.getTestFileSystem();
198      byte[] val = Bytes.toBytes(String.format("%010d", iteration));
199      Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR);
200      for (int i = 0; i < NUM_CFS; i++) {
201        Path hfile = new Path(dir, family(i));
202        byte[] fam = Bytes.toBytes(family(i));
203        createHFile(fs, hfile, fam, QUAL, val, 1000);
204        family2Files.put(fam, Collections.singletonList(hfile));
205      }
206      // bulk load HFiles
207      BulkLoadHFiles.create(UTIL.getConfiguration()).bulkLoad(tableName, family2Files);
208      final Connection conn = UTIL.getConnection();
209      // Periodically do compaction to reduce the number of open file handles.
210      if (numBulkLoads.get() % 5 == 0) {
211        // 5 * 50 = 250 open file handles!
212        try (RegionLocator locator = conn.getRegionLocator(tableName)) {
213          HRegionLocation loc = locator.getRegionLocation(Bytes.toBytes("aaa"), true);
214          conn.getAdmin().compactRegion(loc.getRegion().getRegionName());
215          numCompactions.incrementAndGet();
216        }
217      }
218    }
219  }
220
221  public static class MyObserver implements RegionCoprocessor, RegionObserver {
222    static int sleepDuration;
223
224    @Override
225    public Optional<RegionObserver> getRegionObserver() {
226      return Optional.of(this);
227    }
228
229    @Override
230    public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> e,
231      Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
232      CompactionRequest request) throws IOException {
233      try {
234        Thread.sleep(sleepDuration);
235      } catch (InterruptedException ie) {
236        IOException ioe = new InterruptedIOException();
237        ioe.initCause(ie);
238        throw ioe;
239      }
240      return scanner;
241    }
242  }
243
244  /**
245   * Thread that does full scans of the table looking for any partially completed rows.
246   */
247  public static class AtomicScanReader extends RepeatingTestThread {
248    byte targetFamilies[][];
249    Table table;
250    AtomicLong numScans = new AtomicLong();
251    AtomicLong numRowsScanned = new AtomicLong();
252    TableName TABLE_NAME;
253
254    public AtomicScanReader(TableName TABLE_NAME, TestContext ctx, byte targetFamilies[][])
255      throws IOException {
256      super(ctx);
257      this.TABLE_NAME = TABLE_NAME;
258      this.targetFamilies = targetFamilies;
259      table = UTIL.getConnection().getTable(TABLE_NAME);
260    }
261
262    @Override
263    public void doAnAction() throws Exception {
264      Scan s = new Scan();
265      for (byte[] family : targetFamilies) {
266        s.addFamily(family);
267      }
268      ResultScanner scanner = table.getScanner(s);
269
270      for (Result res : scanner) {
271        byte[] lastRow = null, lastFam = null, lastQual = null;
272        byte[] gotValue = null;
273        for (byte[] family : targetFamilies) {
274          byte qualifier[] = QUAL;
275          byte thisValue[] = res.getValue(family, qualifier);
276          if (gotValue != null && thisValue != null && !Bytes.equals(gotValue, thisValue)) {
277
278            StringBuilder msg = new StringBuilder();
279            msg.append("Failed on scan ").append(numScans).append(" after scanning ")
280              .append(numRowsScanned).append(" rows!\n");
281            msg.append("Current  was " + Bytes.toString(res.getRow()) + "/" + Bytes.toString(family)
282              + ":" + Bytes.toString(qualifier) + " = " + Bytes.toString(thisValue) + "\n");
283            msg.append("Previous  was " + Bytes.toString(lastRow) + "/" + Bytes.toString(lastFam)
284              + ":" + Bytes.toString(lastQual) + " = " + Bytes.toString(gotValue));
285            throw new RuntimeException(msg.toString());
286          }
287
288          lastFam = family;
289          lastQual = qualifier;
290          lastRow = res.getRow();
291          gotValue = thisValue;
292        }
293        numRowsScanned.getAndIncrement();
294      }
295      numScans.getAndIncrement();
296    }
297  }
298
299  /**
300   * Creates a table with given table name and specified number of column families if the table does
301   * not already exist.
302   */
303  public void setupTable(TableName table, int cfs) throws IOException {
304    try {
305      LOG.info("Creating table " + table);
306      TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(table);
307
308      tableDescriptorBuilder.setCoprocessor(MyObserver.class.getName());
309      MyObserver.sleepDuration = this.sleepDuration;
310      for (int i = 0; i < 10; i++) {
311        ColumnFamilyDescriptor columnFamilyDescriptor =
312          ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family(i))).build();
313        tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
314      }
315
316      UTIL.getAdmin().createTable(tableDescriptorBuilder.build());
317    } catch (TableExistsException tee) {
318      LOG.info("Table " + table + " already exists");
319    }
320  }
321
322  /**
323   * Atomic bulk load.
324   */
325  @Test
326  public void testAtomicBulkLoad() throws Exception {
327    TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad");
328
329    int millisToRun = 30000;
330    int numScanners = 50;
331
332    // Set createWALDir to true and use default values for other options.
333    UTIL.startMiniCluster(StartTestingClusterOption.builder().createWALDir(true).build());
334    try {
335      WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
336      FindBulkHBaseListener listener = new FindBulkHBaseListener();
337      log.registerWALActionsListener(listener);
338      runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
339      assertThat(listener.isFound(), is(true));
340    } finally {
341      UTIL.shutdownMiniCluster();
342    }
343  }
344
345  void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
346    throws Exception {
347    setupTable(tableName, 10);
348
349    TestContext ctx = new TestContext(UTIL.getConfiguration());
350
351    AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
352    ctx.addThread(loader);
353
354    List<AtomicScanReader> scanners = Lists.newArrayList();
355    for (int i = 0; i < numScanners; i++) {
356      AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
357      scanners.add(scanner);
358      ctx.addThread(scanner);
359    }
360
361    ctx.startThreads();
362    ctx.waitFor(millisToRun);
363    ctx.stop();
364
365    LOG.info("Loaders:");
366    LOG.info("  loaded " + loader.numBulkLoads.get());
367    LOG.info("  compations " + loader.numCompactions.get());
368
369    LOG.info("Scanners:");
370    for (AtomicScanReader scanner : scanners) {
371      LOG.info("  scanned " + scanner.numScans.get());
372      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
373    }
374  }
375
376  /**
377   * Run test on an HBase instance for 5 minutes. This assumes that the table under test only has a
378   * single region.
379   */
380  public static void main(String args[]) throws Exception {
381    try {
382      Configuration c = HBaseConfiguration.create();
383      TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0);
384      test.setConf(c);
385      test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
386    } finally {
387      System.exit(0); // something hangs (believe it is lru threadpool)
388    }
389  }
390
391  private void setConf(Configuration c) {
392    UTIL = new HBaseTestingUtil(c);
393  }
394
395  static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener {
396    private boolean found = false;
397
398    @Override
399    public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
400      for (Cell cell : logEdit.getCells()) {
401        KeyValue kv = KeyValueUtil.ensureKeyValue((ExtendedCell) cell);
402        for (Map.Entry<String, Object> entry : kv.toStringMap().entrySet()) {
403          if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) {
404            found = true;
405          }
406        }
407      }
408    }
409
410    public boolean isFound() {
411      return found;
412    }
413  }
414}