001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.Collections; 025import java.util.Optional; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.SynchronousQueue; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HColumnDescriptor; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.Durability; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.ResultScanner; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.util.Threads; 045import org.apache.hadoop.hbase.wal.WALEdit; 046import org.junit.After; 047import org.junit.AfterClass; 048import org.junit.BeforeClass; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052 053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 054 055/** 056 * Test that a coprocessor can open a connection and write to another table, inside a hook. 057 */ 058@Category({ CoprocessorTests.class, MediumTests.class }) 059public class TestOpenTableInCoprocessor { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestOpenTableInCoprocessor.class); 064 065 private static final TableName otherTable = TableName.valueOf("otherTable"); 066 private static final TableName primaryTable = TableName.valueOf("primary"); 067 private static final byte[] family = new byte[] { 'f' }; 068 069 private static boolean[] completed = new boolean[1]; 070 071 /** 072 * Custom coprocessor that just copies the write to another table. 073 */ 074 public static class SendToOtherTableCoprocessor implements RegionCoprocessor, RegionObserver { 075 076 @Override 077 public Optional<RegionObserver> getRegionObserver() { 078 return Optional.of(this); 079 } 080 081 @Override 082 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, 083 final WALEdit edit, final Durability durability) throws IOException { 084 try (Table table = e.getEnvironment().getConnection().getTable(otherTable)) { 085 table.put(put); 086 completed[0] = true; 087 } 088 } 089 090 } 091 092 private static boolean[] completedWithPool = new boolean[1]; 093 094 /** 095 * Coprocessor that creates an HTable with a pool to write to another table 096 */ 097 public static class CustomThreadPoolCoprocessor implements RegionCoprocessor, RegionObserver { 098 099 /** 100 * @return a pool that has one thread only at every time. A second action added to the pool ( 101 * running concurrently), will cause an exception. 102 */ 103 private ExecutorService getPool() { 104 int maxThreads = 1; 105 long keepAliveTime = 60; 106 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, 107 TimeUnit.SECONDS, new SynchronousQueue<>(), 108 new ThreadFactoryBuilder().setNameFormat("hbase-table-pool-%d").setDaemon(true) 109 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 110 pool.allowCoreThreadTimeOut(true); 111 return pool; 112 } 113 114 @Override 115 public Optional<RegionObserver> getRegionObserver() { 116 return Optional.of(this); 117 } 118 119 @Override 120 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, 121 final WALEdit edit, final Durability durability) throws IOException { 122 try (Table table = e.getEnvironment().getConnection().getTable(otherTable, getPool())) { 123 Put p = new Put(new byte[] { 'a' }); 124 p.addColumn(family, null, new byte[] { 'a' }); 125 try { 126 table.batch(Collections.singletonList(put), null); 127 } catch (InterruptedException e1) { 128 throw new IOException(e1); 129 } 130 completedWithPool[0] = true; 131 } 132 } 133 } 134 135 private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); 136 137 @BeforeClass 138 public static void setupCluster() throws Exception { 139 UTIL.startMiniCluster(); 140 } 141 142 @After 143 public void cleanupTestTable() throws Exception { 144 UTIL.getAdmin().disableTable(primaryTable); 145 UTIL.getAdmin().deleteTable(primaryTable); 146 147 UTIL.getAdmin().disableTable(otherTable); 148 UTIL.getAdmin().deleteTable(otherTable); 149 150 } 151 152 @AfterClass 153 public static void teardownCluster() throws Exception { 154 UTIL.shutdownMiniCluster(); 155 } 156 157 @Test 158 public void testCoprocessorCanCreateConnectionToRemoteTable() throws Throwable { 159 runCoprocessorConnectionToRemoteTable(SendToOtherTableCoprocessor.class, completed); 160 } 161 162 @Test 163 public void testCoprocessorCanCreateConnectionToRemoteTableWithCustomPool() throws Throwable { 164 runCoprocessorConnectionToRemoteTable(CustomThreadPoolCoprocessor.class, completedWithPool); 165 } 166 167 private void runCoprocessorConnectionToRemoteTable(Class clazz, boolean[] completeCheck) 168 throws Throwable { 169 // Check if given class implements RegionObserver. 170 assert (RegionObserver.class.isAssignableFrom(clazz)); 171 HTableDescriptor primary = new HTableDescriptor(primaryTable); 172 primary.addFamily(new HColumnDescriptor(family)); 173 // add our coprocessor 174 primary.addCoprocessor(clazz.getName()); 175 176 HTableDescriptor other = new HTableDescriptor(otherTable); 177 other.addFamily(new HColumnDescriptor(family)); 178 179 Admin admin = UTIL.getAdmin(); 180 admin.createTable(primary); 181 admin.createTable(other); 182 183 Table table = UTIL.getConnection().getTable(TableName.valueOf("primary")); 184 Put p = new Put(new byte[] { 'a' }); 185 p.addColumn(family, null, new byte[] { 'a' }); 186 table.put(p); 187 table.close(); 188 189 Table target = UTIL.getConnection().getTable(otherTable); 190 assertTrue("Didn't complete update to target table!", completeCheck[0]); 191 assertEquals("Didn't find inserted row", 1, getKeyValueCount(target)); 192 target.close(); 193 } 194 195 /** 196 * Count the number of keyvalue in the table. Scans all possible versions 197 * @param table table to scan 198 * @return number of keyvalues over all rows in the table 199 */ 200 private int getKeyValueCount(Table table) throws IOException { 201 Scan scan = new Scan(); 202 scan.setMaxVersions(Integer.MAX_VALUE - 1); 203 204 ResultScanner results = table.getScanner(scan); 205 int count = 0; 206 for (Result res : results) { 207 count += res.listCells().size(); 208 System.out.println(count + ") " + res); 209 } 210 results.close(); 211 212 return count; 213 } 214}