001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.core.Is.is;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.List;
030import java.util.Map;
031import java.util.Optional;
032import java.util.concurrent.atomic.AtomicLong;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.HColumnDescriptor;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HTableDescriptor;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.KeyValueUtil;
045import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
046import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
047import org.apache.hadoop.hbase.StartMiniClusterOption;
048import org.apache.hadoop.hbase.TableExistsException;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.ClientServiceCallable;
051import org.apache.hadoop.hbase.client.ClusterConnection;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.ResultScanner;
055import org.apache.hadoop.hbase.client.RpcRetryingCaller;
056import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
057import org.apache.hadoop.hbase.client.Scan;
058import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
059import org.apache.hadoop.hbase.client.Table;
060import org.apache.hadoop.hbase.coprocessor.ObserverContext;
061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
062import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
063import org.apache.hadoop.hbase.coprocessor.RegionObserver;
064import org.apache.hadoop.hbase.io.compress.Compression;
065import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
066import org.apache.hadoop.hbase.io.hfile.CacheConfig;
067import org.apache.hadoop.hbase.io.hfile.HFile;
068import org.apache.hadoop.hbase.io.hfile.HFileContext;
069import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
070import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
071import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
072import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
073import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
074import org.apache.hadoop.hbase.testclassification.LargeTests;
075import org.apache.hadoop.hbase.testclassification.RegionServerTests;
076import org.apache.hadoop.hbase.util.Bytes;
077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
078import org.apache.hadoop.hbase.util.Pair;
079import org.apache.hadoop.hbase.wal.WAL;
080import org.apache.hadoop.hbase.wal.WALEdit;
081import org.apache.hadoop.hbase.wal.WALKey;
082import org.junit.BeforeClass;
083import org.junit.ClassRule;
084import org.junit.Test;
085import org.junit.experimental.categories.Category;
086import org.junit.runner.RunWith;
087import org.junit.runners.Parameterized;
088import org.junit.runners.Parameterized.Parameters;
089import org.slf4j.Logger;
090import org.slf4j.LoggerFactory;
091
092import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
093
094import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
097
098/**
099 * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of the region server's
100 * bullkLoad functionality.
101 */
102@RunWith(Parameterized.class)
103@Category({ RegionServerTests.class, LargeTests.class })
104public class TestHRegionServerBulkLoad {
105
106  @ClassRule
107  public static final HBaseClassTestRule CLASS_RULE =
108    HBaseClassTestRule.forClass(TestHRegionServerBulkLoad.class);
109
110  private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class);
111  protected static HBaseTestingUtility UTIL = new HBaseTestingUtility();
112  protected final static Configuration conf = UTIL.getConfiguration();
113  protected final static byte[] QUAL = Bytes.toBytes("qual");
114  protected final static int NUM_CFS = 10;
115  private int sleepDuration;
116  public static int BLOCKSIZE = 64 * 1024;
117  public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
118
119  protected final static byte[][] families = new byte[NUM_CFS][];
120  static {
121    for (int i = 0; i < NUM_CFS; i++) {
122      families[i] = Bytes.toBytes(family(i));
123    }
124  }
125
126  @Parameters
127  public static final Collection<Object[]> parameters() {
128    int[] sleepDurations = new int[] { 0, 30000 };
129    List<Object[]> configurations = new ArrayList<>();
130    for (int i : sleepDurations) {
131      configurations.add(new Object[] { i });
132    }
133    return configurations;
134  }
135
136  public TestHRegionServerBulkLoad(int duration) {
137    this.sleepDuration = duration;
138  }
139
140  @BeforeClass
141  public static void setUpBeforeClass() throws Exception {
142    conf.setInt("hbase.rpc.timeout", 10 * 1000);
143  }
144
145  /**
146   * Create a rowkey compatible with
147   * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}.
148   */
149  public static byte[] rowkey(int i) {
150    return Bytes.toBytes(String.format("row_%08d", i));
151  }
152
153  static String family(int i) {
154    return String.format("family_%04d", i);
155  }
156
157  /**
158   * Create an HFile with the given number of rows with a specified value.
159   */
160  public static void createHFile(FileSystem fs, Path path, byte[] family, byte[] qualifier,
161    byte[] value, int numRows) throws IOException {
162    HFileContext context =
163      new HFileContextBuilder().withBlockSize(BLOCKSIZE).withCompression(COMPRESSION).build();
164    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
165      .withFileContext(context).create();
166    long now = EnvironmentEdgeManager.currentTime();
167    try {
168      // subtract 2 since iterateOnSplits doesn't include boundary keys
169      for (int i = 0; i < numRows; i++) {
170        KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
171        writer.append(kv);
172      }
173      writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(now));
174    } finally {
175      writer.close();
176    }
177  }
178
179  /**
180   * Thread that does full scans of the table looking for any partially completed rows. Each
181   * iteration of this loads 10 hdfs files, which occupies 5 file open file handles. So every 10
182   * iterations (500 file handles) it does a region compaction to reduce the number of open file
183   * handles.
184   */
185  public static class AtomicHFileLoader extends RepeatingTestThread {
186    final AtomicLong numBulkLoads = new AtomicLong();
187    final AtomicLong numCompactions = new AtomicLong();
188    private TableName tableName;
189
190    public AtomicHFileLoader(TableName tableName, TestContext ctx, byte targetFamilies[][])
191      throws IOException {
192      super(ctx);
193      this.tableName = tableName;
194    }
195
196    @Override
197    public void doAnAction() throws Exception {
198      long iteration = numBulkLoads.getAndIncrement();
199      Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", iteration));
200
201      // create HFiles for different column families
202      FileSystem fs = UTIL.getTestFileSystem();
203      byte[] val = Bytes.toBytes(String.format("%010d", iteration));
204      final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS);
205      for (int i = 0; i < NUM_CFS; i++) {
206        Path hfile = new Path(dir, family(i));
207        byte[] fam = Bytes.toBytes(family(i));
208        createHFile(fs, hfile, fam, QUAL, val, 1000);
209        famPaths.add(new Pair<>(fam, hfile.toString()));
210      }
211
212      // bulk load HFiles
213      final ClusterConnection conn = (ClusterConnection) UTIL.getConnection();
214      Table table = conn.getTable(tableName);
215      final String bulkToken =
216        new SecureBulkLoadClient(UTIL.getConfiguration(), table).prepareBulkLoad(conn);
217      ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, tableName,
218        Bytes.toBytes("aaa"), new RpcControllerFactory(UTIL.getConfiguration()).newController(),
219        HConstants.PRIORITY_UNSET, Collections.emptyMap()) {
220        @Override
221        public Void rpcCall() throws Exception {
222          LOG.debug("Going to connect to server " + getLocation() + " for row "
223            + Bytes.toStringBinary(getRow()));
224          SecureBulkLoadClient secureClient = null;
225          byte[] regionName = getLocation().getRegionInfo().getRegionName();
226          try (Table table = conn.getTable(getTableName())) {
227            secureClient = new SecureBulkLoadClient(UTIL.getConfiguration(), table);
228            secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, true, null,
229              bulkToken);
230          }
231          return null;
232        }
233      };
234      RpcRetryingCallerFactory factory =
235        new RpcRetryingCallerFactory(conf, conn.getConnectionConfiguration());
236      RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
237      caller.callWithRetries(callable, Integer.MAX_VALUE);
238
239      // Periodically do compaction to reduce the number of open file handles.
240      if (numBulkLoads.get() % 5 == 0) {
241        // 5 * 50 = 250 open file handles!
242        callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
243          new RpcControllerFactory(UTIL.getConfiguration()).newController(),
244          HConstants.PRIORITY_UNSET, Collections.emptyMap()) {
245          @Override
246          protected Void rpcCall() throws Exception {
247            LOG.debug("compacting " + getLocation() + " for row " + Bytes.toStringBinary(getRow()));
248            AdminProtos.AdminService.BlockingInterface server =
249              conn.getAdmin(getLocation().getServerName());
250            CompactRegionRequest request = RequestConverter
251              .buildCompactRegionRequest(getLocation().getRegionInfo().getRegionName(), true, null);
252            server.compactRegion(null, request);
253            numCompactions.incrementAndGet();
254            return null;
255          }
256        };
257        caller.callWithRetries(callable, Integer.MAX_VALUE);
258      }
259    }
260  }
261
262  public static class MyObserver implements RegionCoprocessor, RegionObserver {
263    static int sleepDuration;
264
265    @Override
266    public Optional<RegionObserver> getRegionObserver() {
267      return Optional.of(this);
268    }
269
270    @Override
271    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
272      InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
273      CompactionRequest request) throws IOException {
274      try {
275        Thread.sleep(sleepDuration);
276      } catch (InterruptedException ie) {
277        IOException ioe = new InterruptedIOException();
278        ioe.initCause(ie);
279        throw ioe;
280      }
281      return scanner;
282    }
283  }
284
285  /**
286   * Thread that does full scans of the table looking for any partially completed rows.
287   */
288  public static class AtomicScanReader extends RepeatingTestThread {
289    byte targetFamilies[][];
290    Table table;
291    AtomicLong numScans = new AtomicLong();
292    AtomicLong numRowsScanned = new AtomicLong();
293    TableName TABLE_NAME;
294
295    public AtomicScanReader(TableName TABLE_NAME, TestContext ctx, byte targetFamilies[][])
296      throws IOException {
297      super(ctx);
298      this.TABLE_NAME = TABLE_NAME;
299      this.targetFamilies = targetFamilies;
300      table = UTIL.getConnection().getTable(TABLE_NAME);
301    }
302
303    @Override
304    public void doAnAction() throws Exception {
305      Scan s = new Scan();
306      for (byte[] family : targetFamilies) {
307        s.addFamily(family);
308      }
309      ResultScanner scanner = table.getScanner(s);
310
311      for (Result res : scanner) {
312        byte[] lastRow = null, lastFam = null, lastQual = null;
313        byte[] gotValue = null;
314        for (byte[] family : targetFamilies) {
315          byte qualifier[] = QUAL;
316          byte thisValue[] = res.getValue(family, qualifier);
317          if (gotValue != null && thisValue != null && !Bytes.equals(gotValue, thisValue)) {
318
319            StringBuilder msg = new StringBuilder();
320            msg.append("Failed on scan ").append(numScans).append(" after scanning ")
321              .append(numRowsScanned).append(" rows!\n");
322            msg.append("Current  was " + Bytes.toString(res.getRow()) + "/" + Bytes.toString(family)
323              + ":" + Bytes.toString(qualifier) + " = " + Bytes.toString(thisValue) + "\n");
324            msg.append("Previous  was " + Bytes.toString(lastRow) + "/" + Bytes.toString(lastFam)
325              + ":" + Bytes.toString(lastQual) + " = " + Bytes.toString(gotValue));
326            throw new RuntimeException(msg.toString());
327          }
328
329          lastFam = family;
330          lastQual = qualifier;
331          lastRow = res.getRow();
332          gotValue = thisValue;
333        }
334        numRowsScanned.getAndIncrement();
335      }
336      numScans.getAndIncrement();
337    }
338  }
339
340  /**
341   * Creates a table with given table name and specified number of column families if the table does
342   * not already exist.
343   */
344  public void setupTable(TableName table, int cfs) throws IOException {
345    try {
346      LOG.info("Creating table " + table);
347      HTableDescriptor htd = new HTableDescriptor(table);
348      htd.addCoprocessor(MyObserver.class.getName());
349      MyObserver.sleepDuration = this.sleepDuration;
350      for (int i = 0; i < 10; i++) {
351        htd.addFamily(new HColumnDescriptor(family(i)));
352      }
353
354      UTIL.getAdmin().createTable(htd);
355    } catch (TableExistsException tee) {
356      LOG.info("Table " + table + " already exists");
357    }
358  }
359
360  /**
361   * Atomic bulk load.
362   */
363  @Test
364  public void testAtomicBulkLoad() throws Exception {
365    TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad");
366
367    int millisToRun = 30000;
368    int numScanners = 50;
369
370    // Set createWALDir to true and use default values for other options.
371    UTIL.startMiniCluster(StartMiniClusterOption.builder().createWALDir(true).build());
372    try {
373      WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
374      FindBulkHBaseListener listener = new FindBulkHBaseListener();
375      log.registerWALActionsListener(listener);
376      runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
377      assertThat(listener.isFound(), is(true));
378    } finally {
379      UTIL.shutdownMiniCluster();
380    }
381  }
382
383  void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
384    throws Exception {
385    setupTable(tableName, 10);
386
387    TestContext ctx = new TestContext(UTIL.getConfiguration());
388
389    AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
390    ctx.addThread(loader);
391
392    List<AtomicScanReader> scanners = Lists.newArrayList();
393    for (int i = 0; i < numScanners; i++) {
394      AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
395      scanners.add(scanner);
396      ctx.addThread(scanner);
397    }
398
399    ctx.startThreads();
400    ctx.waitFor(millisToRun);
401    ctx.stop();
402
403    LOG.info("Loaders:");
404    LOG.info("  loaded " + loader.numBulkLoads.get());
405    LOG.info("  compations " + loader.numCompactions.get());
406
407    LOG.info("Scanners:");
408    for (AtomicScanReader scanner : scanners) {
409      LOG.info("  scanned " + scanner.numScans.get());
410      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
411    }
412  }
413
414  /**
415   * Run test on an HBase instance for 5 minutes. This assumes that the table under test only has a
416   * single region.
417   */
418  public static void main(String args[]) throws Exception {
419    try {
420      Configuration c = HBaseConfiguration.create();
421      TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0);
422      test.setConf(c);
423      test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
424    } finally {
425      System.exit(0); // something hangs (believe it is lru threadpool)
426    }
427  }
428
429  private void setConf(Configuration c) {
430    UTIL = new HBaseTestingUtility(c);
431  }
432
433  static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener {
434    private boolean found = false;
435
436    @Override
437    public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
438      for (Cell cell : logEdit.getCells()) {
439        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
440        for (Map.Entry entry : kv.toStringMap().entrySet()) {
441          if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) {
442            found = true;
443          }
444        }
445      }
446    }
447
448    public boolean isFound() {
449      return found;
450    }
451  }
452}