001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static junit.framework.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Map;
026import java.util.Optional;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.Delete;
036import org.apache.hadoop.hbase.client.Durability;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
042import org.apache.hadoop.hbase.testclassification.MediumTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
046import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
047import org.apache.hadoop.hbase.wal.WALEdit;
048import org.junit.AfterClass;
049import org.junit.Before;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054
055@Category({ CoprocessorTests.class, MediumTests.class })
056public class TestRegionObserverBypass {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060    HBaseClassTestRule.forClass(TestRegionObserverBypass.class);
061
062  private static HBaseTestingUtility util;
063  private static final TableName tableName = TableName.valueOf("test");
064  private static final byte[] dummy = Bytes.toBytes("dummy");
065  private static final byte[] row1 = Bytes.toBytes("r1");
066  private static final byte[] row2 = Bytes.toBytes("r2");
067  private static final byte[] row3 = Bytes.toBytes("r3");
068  private static final byte[] test = Bytes.toBytes("test");
069
070  @BeforeClass
071  public static void setUpBeforeClass() throws Exception {
072    // Stack up three coprocessors just so I can check bypass skips subsequent calls.
073    Configuration conf = HBaseConfiguration.create();
074    conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
075      new String[] { TestCoprocessor.class.getName(), TestCoprocessor2.class.getName(),
076        TestCoprocessor3.class.getName() });
077    util = new HBaseTestingUtility(conf);
078    util.startMiniCluster();
079  }
080
081  @AfterClass
082  public static void tearDownAfterClass() throws Exception {
083    util.shutdownMiniCluster();
084  }
085
086  @Before
087  public void setUp() throws Exception {
088    Admin admin = util.getAdmin();
089    if (admin.tableExists(tableName)) {
090      if (admin.isTableEnabled(tableName)) {
091        admin.disableTable(tableName);
092      }
093      admin.deleteTable(tableName);
094    }
095    util.createTable(tableName, new byte[][] { dummy, test });
096    TestCoprocessor.PREPUT_BYPASSES.set(0);
097    TestCoprocessor.PREPUT_INVOCATIONS.set(0);
098  }
099
100  /**
101   * do a single put that is bypassed by a RegionObserver
102   */
103  @Test
104  public void testSimple() throws Exception {
105    Table t = util.getConnection().getTable(tableName);
106    Put p = new Put(row1);
107    p.addColumn(test, dummy, dummy);
108    // before HBASE-4331, this would throw an exception
109    t.put(p);
110    checkRowAndDelete(t, row1, 0);
111    t.close();
112  }
113
114  /**
115   * Test various multiput operations. If the column family is 'test', then bypass is invoked.
116   */
117  @Test
118  public void testMulti() throws Exception {
119    // ensure that server time increments every time we do an operation, otherwise
120    // previous deletes will eclipse successive puts having the same timestamp
121    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
122
123    Table t = util.getConnection().getTable(tableName);
124    List<Put> puts = new ArrayList<>();
125    Put p = new Put(row1);
126    p.addColumn(dummy, dummy, dummy);
127    puts.add(p);
128    p = new Put(row2);
129    p.addColumn(test, dummy, dummy);
130    puts.add(p);
131    p = new Put(row3);
132    p.addColumn(test, dummy, dummy);
133    puts.add(p);
134    // before HBASE-4331, this would throw an exception
135    t.put(puts);
136    checkRowAndDelete(t, row1, 1);
137    checkRowAndDelete(t, row2, 0);
138    checkRowAndDelete(t, row3, 0);
139
140    puts.clear();
141    p = new Put(row1);
142    p.addColumn(test, dummy, dummy);
143    puts.add(p);
144    p = new Put(row2);
145    p.addColumn(test, dummy, dummy);
146    puts.add(p);
147    p = new Put(row3);
148    p.addColumn(test, dummy, dummy);
149    puts.add(p);
150    // before HBASE-4331, this would throw an exception
151    t.put(puts);
152    checkRowAndDelete(t, row1, 0);
153    checkRowAndDelete(t, row2, 0);
154    checkRowAndDelete(t, row3, 0);
155
156    puts.clear();
157    p = new Put(row1);
158    p.addColumn(test, dummy, dummy);
159    puts.add(p);
160    p = new Put(row2);
161    p.addColumn(test, dummy, dummy);
162    puts.add(p);
163    p = new Put(row3);
164    p.addColumn(dummy, dummy, dummy);
165    puts.add(p);
166    // this worked fine even before HBASE-4331
167    t.put(puts);
168    checkRowAndDelete(t, row1, 0);
169    checkRowAndDelete(t, row2, 0);
170    checkRowAndDelete(t, row3, 1);
171
172    puts.clear();
173    p = new Put(row1);
174    p.addColumn(dummy, dummy, dummy);
175    puts.add(p);
176    p = new Put(row2);
177    p.addColumn(test, dummy, dummy);
178    puts.add(p);
179    p = new Put(row3);
180    p.addColumn(dummy, dummy, dummy);
181    puts.add(p);
182    // this worked fine even before HBASE-4331
183    t.put(puts);
184    checkRowAndDelete(t, row1, 1);
185    checkRowAndDelete(t, row2, 0);
186    checkRowAndDelete(t, row3, 1);
187
188    puts.clear();
189    p = new Put(row1);
190    p.addColumn(test, dummy, dummy);
191    puts.add(p);
192    p = new Put(row2);
193    p.addColumn(dummy, dummy, dummy);
194    puts.add(p);
195    p = new Put(row3);
196    p.addColumn(test, dummy, dummy);
197    puts.add(p);
198    // before HBASE-4331, this would throw an exception
199    t.put(puts);
200    checkRowAndDelete(t, row1, 0);
201    checkRowAndDelete(t, row2, 1);
202    checkRowAndDelete(t, row3, 0);
203    t.close();
204
205    EnvironmentEdgeManager.reset();
206  }
207
208  private void checkRowAndDelete(Table t, byte[] row, int count) throws IOException {
209    Get g = new Get(row);
210    Result r = t.get(g);
211    assertEquals(count, r.size());
212    Delete d = new Delete(row);
213    t.delete(d);
214  }
215
216  /**
217   * Test that when bypass is called, we skip out calling any other coprocessors stacked up method,
218   * in this case, a prePut. If the column family is 'test', then bypass is invoked.
219   */
220  @Test
221  public void testBypassAlsoCompletes() throws IOException {
222    // ensure that server time increments every time we do an operation, otherwise
223    // previous deletes will eclipse successive puts having the same timestamp
224    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
225
226    Table t = util.getConnection().getTable(tableName);
227    List<Put> puts = new ArrayList<>();
228    Put p = new Put(row1);
229    p.addColumn(dummy, dummy, dummy);
230    puts.add(p);
231    p = new Put(row2);
232    p.addColumn(test, dummy, dummy);
233    puts.add(p);
234    p = new Put(row3);
235    p.addColumn(test, dummy, dummy);
236    puts.add(p);
237    t.put(puts);
238    // Ensure expected result.
239    checkRowAndDelete(t, row1, 1);
240    checkRowAndDelete(t, row2, 0);
241    checkRowAndDelete(t, row3, 0);
242    // We have three Coprocessors stacked up on the prePut. See the beforeClass setup. We did three
243    // puts above two of which bypassed. A bypass means do not call the other coprocessors in the
244    // stack so for the two 'test' calls in the above, we should not have call through to all all
245    // three coprocessors in the chain. So we should have:
246    // 3 invocations for first put + 1 invocation + 1 bypass for second put + 1 invocation +
247    // 1 bypass for the last put. Assert.
248    assertEquals("Total CP invocation count", 5, TestCoprocessor.PREPUT_INVOCATIONS.get());
249    assertEquals("Total CP bypasses", 2, TestCoprocessor.PREPUT_BYPASSES.get());
250  }
251
252  public static class TestCoprocessor implements RegionCoprocessor, RegionObserver {
253    static AtomicInteger PREPUT_INVOCATIONS = new AtomicInteger(0);
254    static AtomicInteger PREPUT_BYPASSES = new AtomicInteger(0);
255
256    @Override
257    public Optional<RegionObserver> getRegionObserver() {
258      return Optional.of(this);
259    }
260
261    @Override
262    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
263      final WALEdit edit, final Durability durability) throws IOException {
264      PREPUT_INVOCATIONS.incrementAndGet();
265      Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
266      if (familyMap.containsKey(test)) {
267        PREPUT_BYPASSES.incrementAndGet();
268        e.bypass();
269      }
270    }
271  }
272
273  /**
274   * Calls through to TestCoprocessor.
275   */
276  public static class TestCoprocessor2 extends TestRegionObserverBypass.TestCoprocessor {
277  }
278
279  /**
280   * Calls through to TestCoprocessor.
281   */
282  public static class TestCoprocessor3 extends TestRegionObserverBypass.TestCoprocessor {
283  }
284}