001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor.example; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022 023import java.io.IOException; 024import org.apache.hadoop.hbase.HBaseClassTestRule; 025import org.apache.hadoop.hbase.HBaseTestingUtility; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 028import org.apache.hadoop.hbase.client.Get; 029import org.apache.hadoop.hbase.client.Put; 030import org.apache.hadoop.hbase.client.Table; 031import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 032import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 033import org.apache.hadoop.hbase.testclassification.MediumTests; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; 037import org.apache.zookeeper.CreateMode; 038import org.apache.zookeeper.KeeperException; 039import org.apache.zookeeper.ZooDefs; 040import org.apache.zookeeper.ZooKeeper; 041import org.junit.AfterClass; 042import org.junit.BeforeClass; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046 047@Category({ CoprocessorTests.class, MediumTests.class }) 048public class TestZooKeeperScanPolicyObserver { 049 050 @ClassRule 051 public static final HBaseClassTestRule CLASS_RULE = 052 HBaseClassTestRule.forClass(TestZooKeeperScanPolicyObserver.class); 053 054 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 055 056 private static TableName NAME = TableName.valueOf("TestCP"); 057 058 private static byte[] FAMILY = Bytes.toBytes("cf"); 059 060 private static byte[] QUALIFIER = Bytes.toBytes("cq"); 061 062 private static Table TABLE; 063 064 @BeforeClass 065 public static void setUp() throws Exception { 066 UTIL.startMiniCluster(3); 067 UTIL.getAdmin() 068 .createTable(TableDescriptorBuilder.newBuilder(NAME) 069 .setCoprocessor(ZooKeeperScanPolicyObserver.class.getName()) 070 .setValue(ZooKeeperScanPolicyObserver.ZK_ENSEMBLE_KEY, 071 UTIL.getZkCluster().getAddress().toString()) 072 .setValue(ZooKeeperScanPolicyObserver.ZK_SESSION_TIMEOUT_KEY, "2000") 073 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build()).build()); 074 TABLE = UTIL.getConnection().getTable(NAME); 075 } 076 077 @AfterClass 078 public static void tearDown() throws Exception { 079 if (TABLE != null) { 080 TABLE.close(); 081 } 082 UTIL.shutdownMiniCluster(); 083 } 084 085 private void setExpireBefore(long time) 086 throws KeeperException, InterruptedException, IOException { 087 RecoverableZooKeeper recoverableZk = UTIL.getZooKeeperWatcher().getRecoverableZooKeeper(); 088 // we need to call this for setting up the zookeeper connection 089 recoverableZk.reconnectAfterExpiration(); 090 // we have to use the original ZooKeeper as the RecoverableZooKeeper will append a magic prefix 091 // for the data stored on zookeeper 092 ZooKeeper zk = recoverableZk.getZooKeeper(); 093 if (zk.exists(ZooKeeperScanPolicyObserver.NODE, false) == null) { 094 zk.create(ZooKeeperScanPolicyObserver.NODE, Bytes.toBytes(time), ZooDefs.Ids.OPEN_ACL_UNSAFE, 095 CreateMode.PERSISTENT); 096 } else { 097 zk.setData(ZooKeeperScanPolicyObserver.NODE, Bytes.toBytes(time), -1); 098 } 099 } 100 101 private void assertValueEquals(int start, int end) throws IOException { 102 for (int i = start; i < end; i++) { 103 assertEquals(i, 104 Bytes.toInt(TABLE.get(new Get(Bytes.toBytes(i))).getValue(FAMILY, QUALIFIER))); 105 } 106 } 107 108 private void assertNotExists(int start, int end) throws IOException { 109 for (int i = start; i < end; i++) { 110 assertFalse(TABLE.exists(new Get(Bytes.toBytes(i)))); 111 } 112 } 113 114 private void put(int start, int end, long ts) throws IOException { 115 for (int i = start; i < end; i++) { 116 TABLE.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, ts, Bytes.toBytes(i))); 117 } 118 } 119 120 @Test 121 public void test() throws IOException, KeeperException, InterruptedException { 122 long now = EnvironmentEdgeManager.currentTime(); 123 put(0, 100, now - 10000); 124 assertValueEquals(0, 100); 125 126 setExpireBefore(now - 5000); 127 Thread.sleep(5000); 128 UTIL.getAdmin().flush(NAME); 129 assertNotExists(0, 100); 130 131 put(0, 50, now - 1000); 132 UTIL.getAdmin().flush(NAME); 133 put(50, 100, now - 100); 134 UTIL.getAdmin().flush(NAME); 135 assertValueEquals(0, 100); 136 137 setExpireBefore(now - 500); 138 Thread.sleep(5000); 139 UTIL.getAdmin().majorCompact(NAME); 140 UTIL.waitFor(30000, () -> UTIL.getHBaseCluster().getRegions(NAME).iterator().next() 141 .getStore(FAMILY).getStorefilesCount() == 1); 142 assertNotExists(0, 50); 143 assertValueEquals(50, 100); 144 } 145}