001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.Collections;
025import java.util.Optional;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.SynchronousQueue;
028import java.util.concurrent.ThreadPoolExecutor;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HColumnDescriptor;
033import org.apache.hadoop.hbase.HTableDescriptor;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.Durability;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.ResultScanner;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
043import org.apache.hadoop.hbase.testclassification.MediumTests;
044import org.apache.hadoop.hbase.util.Threads;
045import org.apache.hadoop.hbase.wal.WALEdit;
046import org.junit.After;
047import org.junit.AfterClass;
048import org.junit.BeforeClass;
049import org.junit.ClassRule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052
053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
054
055/**
056 * Test that a coprocessor can open a connection and write to another table, inside a hook.
057 */
058@Category({ CoprocessorTests.class, MediumTests.class })
059public class TestOpenTableInCoprocessor {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestOpenTableInCoprocessor.class);
064
065  private static final TableName otherTable = TableName.valueOf("otherTable");
066  private static final TableName primaryTable = TableName.valueOf("primary");
067  private static final byte[] family = new byte[] { 'f' };
068
069  private static boolean[] completed = new boolean[1];
070
071  /**
072   * Custom coprocessor that just copies the write to another table.
073   */
074  public static class SendToOtherTableCoprocessor implements RegionCoprocessor, RegionObserver {
075
076    @Override
077    public Optional<RegionObserver> getRegionObserver() {
078      return Optional.of(this);
079    }
080
081    @Override
082    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
083      final WALEdit edit, final Durability durability) throws IOException {
084      try (Table table = e.getEnvironment().getConnection().getTable(otherTable)) {
085        table.put(put);
086        completed[0] = true;
087      }
088    }
089
090  }
091
092  private static boolean[] completedWithPool = new boolean[1];
093
094  /**
095   * Coprocessor that creates an HTable with a pool to write to another table
096   */
097  public static class CustomThreadPoolCoprocessor implements RegionCoprocessor, RegionObserver {
098
099    /**
100     * @return a pool that has one thread only at every time. A second action added to the pool (
101     *         running concurrently), will cause an exception.
102     */
103    private ExecutorService getPool() {
104      int maxThreads = 1;
105      long keepAliveTime = 60;
106      ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
107        TimeUnit.SECONDS, new SynchronousQueue<>(),
108        new ThreadFactoryBuilder().setNameFormat("hbase-table-pool-%d").setDaemon(true)
109          .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
110      pool.allowCoreThreadTimeOut(true);
111      return pool;
112    }
113
114    @Override
115    public Optional<RegionObserver> getRegionObserver() {
116      return Optional.of(this);
117    }
118
119    @Override
120    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
121      final WALEdit edit, final Durability durability) throws IOException {
122      try (Table table = e.getEnvironment().getConnection().getTable(otherTable, getPool())) {
123        Put p = new Put(new byte[] { 'a' });
124        p.addColumn(family, null, new byte[] { 'a' });
125        try {
126          table.batch(Collections.singletonList(put), null);
127        } catch (InterruptedException e1) {
128          throw new IOException(e1);
129        }
130        completedWithPool[0] = true;
131      }
132    }
133  }
134
135  private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
136
137  @BeforeClass
138  public static void setupCluster() throws Exception {
139    UTIL.startMiniCluster();
140  }
141
142  @After
143  public void cleanupTestTable() throws Exception {
144    UTIL.getAdmin().disableTable(primaryTable);
145    UTIL.getAdmin().deleteTable(primaryTable);
146
147    UTIL.getAdmin().disableTable(otherTable);
148    UTIL.getAdmin().deleteTable(otherTable);
149
150  }
151
152  @AfterClass
153  public static void teardownCluster() throws Exception {
154    UTIL.shutdownMiniCluster();
155  }
156
157  @Test
158  public void testCoprocessorCanCreateConnectionToRemoteTable() throws Throwable {
159    runCoprocessorConnectionToRemoteTable(SendToOtherTableCoprocessor.class, completed);
160  }
161
162  @Test
163  public void testCoprocessorCanCreateConnectionToRemoteTableWithCustomPool() throws Throwable {
164    runCoprocessorConnectionToRemoteTable(CustomThreadPoolCoprocessor.class, completedWithPool);
165  }
166
167  private void runCoprocessorConnectionToRemoteTable(Class clazz, boolean[] completeCheck)
168    throws Throwable {
169    // Check if given class implements RegionObserver.
170    assert (RegionObserver.class.isAssignableFrom(clazz));
171    HTableDescriptor primary = new HTableDescriptor(primaryTable);
172    primary.addFamily(new HColumnDescriptor(family));
173    // add our coprocessor
174    primary.addCoprocessor(clazz.getName());
175
176    HTableDescriptor other = new HTableDescriptor(otherTable);
177    other.addFamily(new HColumnDescriptor(family));
178
179    Admin admin = UTIL.getAdmin();
180    admin.createTable(primary);
181    admin.createTable(other);
182
183    Table table = UTIL.getConnection().getTable(TableName.valueOf("primary"));
184    Put p = new Put(new byte[] { 'a' });
185    p.addColumn(family, null, new byte[] { 'a' });
186    table.put(p);
187    table.close();
188
189    Table target = UTIL.getConnection().getTable(otherTable);
190    assertTrue("Didn't complete update to target table!", completeCheck[0]);
191    assertEquals("Didn't find inserted row", 1, getKeyValueCount(target));
192    target.close();
193  }
194
195  /**
196   * Count the number of keyvalue in the table. Scans all possible versions
197   * @param table table to scan
198   * @return number of keyvalues over all rows in the table
199   */
200  private int getKeyValueCount(Table table) throws IOException {
201    Scan scan = new Scan();
202    scan.setMaxVersions(Integer.MAX_VALUE - 1);
203
204    ResultScanner results = table.getScanner(scan);
205    int count = 0;
206    for (Result res : results) {
207      count += res.listCells().size();
208      System.out.println(count + ") " + res);
209    }
210    results.close();
211
212    return count;
213  }
214}