001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.Collections; 025import java.util.Optional; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.SynchronousQueue; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Admin; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.Durability; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.ResultScanner; 039import org.apache.hadoop.hbase.client.Scan; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.util.Threads; 046import org.apache.hadoop.hbase.wal.WALEdit; 047import org.junit.After; 048import org.junit.AfterClass; 049import org.junit.BeforeClass; 050import org.junit.ClassRule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053 054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 055 056/** 057 * Test that a coprocessor can open a connection and write to another table, inside a hook. 058 */ 059@Category({ CoprocessorTests.class, MediumTests.class }) 060public class TestOpenTableInCoprocessor { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestOpenTableInCoprocessor.class); 065 066 private static final TableName otherTable = TableName.valueOf("otherTable"); 067 private static final TableName primaryTable = TableName.valueOf("primary"); 068 private static final byte[] family = new byte[] { 'f' }; 069 070 private static boolean[] completed = new boolean[1]; 071 072 /** 073 * Custom coprocessor that just copies the write to another table. 074 */ 075 public static class SendToOtherTableCoprocessor implements RegionCoprocessor, RegionObserver { 076 077 @Override 078 public Optional<RegionObserver> getRegionObserver() { 079 return Optional.of(this); 080 } 081 082 @Override 083 public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 084 final Put put, final WALEdit edit, final Durability durability) throws IOException { 085 try (Table table = e.getEnvironment().getConnection().getTable(otherTable)) { 086 table.put(put); 087 completed[0] = true; 088 } 089 } 090 091 } 092 093 private static boolean[] completedWithPool = new boolean[1]; 094 095 /** 096 * Coprocessor that creates an HTable with a pool to write to another table 097 */ 098 public static class CustomThreadPoolCoprocessor implements RegionCoprocessor, RegionObserver { 099 100 /** 101 * @return a pool that has one thread only at every time. A second action added to the pool ( 102 * running concurrently), will cause an exception. 103 */ 104 private ExecutorService getPool() { 105 int maxThreads = 1; 106 long keepAliveTime = 60; 107 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, 108 TimeUnit.SECONDS, new SynchronousQueue<>(), 109 new ThreadFactoryBuilder().setNameFormat("hbase-table-pool-%d").setDaemon(true) 110 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 111 pool.allowCoreThreadTimeOut(true); 112 return pool; 113 } 114 115 @Override 116 public Optional<RegionObserver> getRegionObserver() { 117 return Optional.of(this); 118 } 119 120 @Override 121 public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 122 final Put put, final WALEdit edit, final Durability durability) throws IOException { 123 try (Table table = e.getEnvironment().getConnection().getTable(otherTable, getPool())) { 124 Put p = new Put(new byte[] { 'a' }); 125 p.addColumn(family, null, new byte[] { 'a' }); 126 try { 127 table.batch(Collections.singletonList(put), null); 128 } catch (InterruptedException e1) { 129 throw new IOException(e1); 130 } 131 completedWithPool[0] = true; 132 } 133 } 134 } 135 136 private static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 137 138 @BeforeClass 139 public static void setupCluster() throws Exception { 140 UTIL.startMiniCluster(); 141 } 142 143 @After 144 public void cleanupTestTable() throws Exception { 145 UTIL.getAdmin().disableTable(primaryTable); 146 UTIL.getAdmin().deleteTable(primaryTable); 147 148 UTIL.getAdmin().disableTable(otherTable); 149 UTIL.getAdmin().deleteTable(otherTable); 150 151 } 152 153 @AfterClass 154 public static void teardownCluster() throws Exception { 155 UTIL.shutdownMiniCluster(); 156 } 157 158 @Test 159 public void testCoprocessorCanCreateConnectionToRemoteTable() throws Throwable { 160 runCoprocessorConnectionToRemoteTable(SendToOtherTableCoprocessor.class, completed); 161 } 162 163 @Test 164 public void testCoprocessorCanCreateConnectionToRemoteTableWithCustomPool() throws Throwable { 165 runCoprocessorConnectionToRemoteTable(CustomThreadPoolCoprocessor.class, completedWithPool); 166 } 167 168 private void runCoprocessorConnectionToRemoteTable(Class<?> clazz, boolean[] completeCheck) 169 throws Throwable { 170 // Check if given class implements RegionObserver. 171 assert (RegionObserver.class.isAssignableFrom(clazz)); 172 // add our coprocessor 173 TableDescriptor primaryDescriptor = TableDescriptorBuilder.newBuilder(primaryTable) 174 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCoprocessor(clazz.getName()) 175 .build(); 176 177 TableDescriptor otherDescriptor = TableDescriptorBuilder.newBuilder(otherTable) 178 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); 179 180 Admin admin = UTIL.getAdmin(); 181 admin.createTable(primaryDescriptor); 182 admin.createTable(otherDescriptor); 183 184 Table table = UTIL.getConnection().getTable(TableName.valueOf("primary")); 185 Put p = new Put(new byte[] { 'a' }); 186 p.addColumn(family, null, new byte[] { 'a' }); 187 table.put(p); 188 table.close(); 189 190 Table target = UTIL.getConnection().getTable(otherTable); 191 assertTrue("Didn't complete update to target table!", completeCheck[0]); 192 assertEquals("Didn't find inserted row", 1, getKeyValueCount(target)); 193 target.close(); 194 } 195 196 /** 197 * Count the number of keyvalue in the table. Scans all possible versions 198 * @param table table to scan 199 * @return number of keyvalues over all rows in the table 200 */ 201 private int getKeyValueCount(Table table) throws IOException { 202 Scan scan = new Scan(); 203 scan.readVersions(Integer.MAX_VALUE - 1); 204 205 ResultScanner results = table.getScanner(scan); 206 int count = 0; 207 for (Result res : results) { 208 count += res.listCells().size(); 209 System.out.println(count + ") " + res); 210 } 211 results.close(); 212 213 return count; 214 } 215}