001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.List;
024import java.util.concurrent.atomic.AtomicLong;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
030import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.ClientServiceCallable;
033import org.apache.hadoop.hbase.client.ClusterConnection;
034import org.apache.hadoop.hbase.client.RpcRetryingCaller;
035import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
036import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
037import org.apache.hadoop.hbase.testclassification.LargeTests;
038import org.apache.hadoop.hbase.testclassification.RegionServerTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.Pair;
041import org.junit.ClassRule;
042import org.junit.experimental.categories.Category;
043import org.junit.runner.RunWith;
044import org.junit.runners.Parameterized;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
049
050import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
054
055/**
056 * Tests bulk loading of HFiles with old non-secure client for backward compatibility. Will be
057 * removed when old non-secure client for backward compatibility is not supported.
058 */
059@RunWith(Parameterized.class)
060@Category({ RegionServerTests.class, LargeTests.class })
061public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBulkLoad {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065    HBaseClassTestRule.forClass(TestHRegionServerBulkLoadWithOldClient.class);
066
067  private static final Logger LOG =
068    LoggerFactory.getLogger(TestHRegionServerBulkLoadWithOldClient.class);
069
070  public TestHRegionServerBulkLoadWithOldClient(int duration) {
071    super(duration);
072  }
073
074  public static class AtomicHFileLoader extends RepeatingTestThread {
075    final AtomicLong numBulkLoads = new AtomicLong();
076    final AtomicLong numCompactions = new AtomicLong();
077    private TableName tableName;
078
079    public AtomicHFileLoader(TableName tableName, TestContext ctx, byte targetFamilies[][])
080      throws IOException {
081      super(ctx);
082      this.tableName = tableName;
083    }
084
085    @Override
086    public void doAnAction() throws Exception {
087      long iteration = numBulkLoads.getAndIncrement();
088      Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", iteration));
089
090      // create HFiles for different column families
091      FileSystem fs = UTIL.getTestFileSystem();
092      byte[] val = Bytes.toBytes(String.format("%010d", iteration));
093      final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS);
094      for (int i = 0; i < NUM_CFS; i++) {
095        Path hfile = new Path(dir, family(i));
096        byte[] fam = Bytes.toBytes(family(i));
097        createHFile(fs, hfile, fam, QUAL, val, 1000);
098        famPaths.add(new Pair<>(fam, hfile.toString()));
099      }
100
101      // bulk load HFiles
102      final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
103      RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
104      ClientServiceCallable<Void> callable =
105        new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
106          rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET, Collections.emptyMap()) {
107          @Override
108          protected Void rpcCall() throws Exception {
109            LOG.info("Non-secure old client");
110            byte[] regionName = getLocation().getRegionInfo().getRegionName();
111            BulkLoadHFileRequest request =
112              RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true, null, null);
113            getStub().bulkLoadHFile(null, request);
114            return null;
115          }
116        };
117      RpcRetryingCallerFactory factory =
118        new RpcRetryingCallerFactory(conf, conn.getConnectionConfiguration());
119      RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
120      caller.callWithRetries(callable, Integer.MAX_VALUE);
121
122      // Periodically do compaction to reduce the number of open file handles.
123      if (numBulkLoads.get() % 5 == 0) {
124        // 5 * 50 = 250 open file handles!
125        callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
126          rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET, Collections.emptyMap()) {
127          @Override
128          protected Void rpcCall() throws Exception {
129            LOG.debug("compacting " + getLocation() + " for row " + Bytes.toStringBinary(getRow()));
130            AdminProtos.AdminService.BlockingInterface server =
131              conn.getAdmin(getLocation().getServerName());
132            CompactRegionRequest request = RequestConverter
133              .buildCompactRegionRequest(getLocation().getRegionInfo().getRegionName(), true, null);
134            server.compactRegion(null, request);
135            numCompactions.incrementAndGet();
136            return null;
137          }
138        };
139        caller.callWithRetries(callable, Integer.MAX_VALUE);
140      }
141    }
142  }
143
144  @Override
145  void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
146    throws Exception {
147    setupTable(tableName, 10);
148
149    TestContext ctx = new TestContext(UTIL.getConfiguration());
150
151    AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
152    ctx.addThread(loader);
153
154    List<AtomicScanReader> scanners = Lists.newArrayList();
155    for (int i = 0; i < numScanners; i++) {
156      AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
157      scanners.add(scanner);
158      ctx.addThread(scanner);
159    }
160
161    ctx.startThreads();
162    ctx.waitFor(millisToRun);
163    ctx.stop();
164
165    LOG.info("Loaders:");
166    LOG.info("  loaded " + loader.numBulkLoads.get());
167    LOG.info("  compations " + loader.numCompactions.get());
168
169    LOG.info("Scanners:");
170    for (AtomicScanReader scanner : scanners) {
171      LOG.info("  scanned " + scanner.numScans.get());
172      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
173    }
174  }
175}