001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertThrows; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.List; 030import java.util.concurrent.CompletableFuture; 031import java.util.concurrent.TimeUnit; 032import java.util.stream.Collectors; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.regionserver.HRegion; 037import org.apache.hadoop.hbase.regionserver.HRegionServer; 038import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 039import org.apache.hadoop.hbase.testclassification.ClientTests; 040import org.apache.hadoop.hbase.testclassification.MediumTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.FutureUtils; 043import org.apache.hadoop.hbase.util.JVMClusterUtil; 044import org.junit.After; 045import org.junit.AfterClass; 046import org.junit.Before; 047import org.junit.BeforeClass; 048import org.junit.ClassRule; 049import org.junit.Rule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.junit.rules.TestName; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 057 058@Category({ MediumTests.class, ClientTests.class }) 059public class TestFlushFromClient { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestFlushFromClient.class); 064 065 private static final Logger LOG = LoggerFactory.getLogger(TestFlushFromClient.class); 066 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 067 private static AsyncConnection asyncConn; 068 private static final byte[][] SPLITS = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("7") }; 069 private static final List<byte[]> ROWS = 070 Arrays.asList(Bytes.toBytes("1"), Bytes.toBytes("4"), Bytes.toBytes("8")); 071 private static final byte[] FAMILY_1 = Bytes.toBytes("f1"); 072 private static final byte[] FAMILY_2 = Bytes.toBytes("f2"); 073 public static final byte[][] FAMILIES = { FAMILY_1, FAMILY_2 }; 074 @Rule 075 public TestName name = new TestName(); 076 077 public TableName tableName; 078 079 @BeforeClass 080 public static void setUpBeforeClass() throws Exception { 081 TEST_UTIL.startMiniCluster(ROWS.size()); 082 asyncConn = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 083 } 084 085 @AfterClass 086 public static void tearDownAfterClass() throws Exception { 087 Closeables.close(asyncConn, true); 088 TEST_UTIL.shutdownMiniCluster(); 089 } 090 091 @Before 092 public void setUp() throws Exception { 093 tableName = TableName.valueOf(name.getMethodName()); 094 try (Table t = TEST_UTIL.createTable(tableName, FAMILIES, SPLITS)) { 095 List<Put> puts = ROWS.stream().map(r -> new Put(r)).collect(Collectors.toList()); 096 for (int i = 0; i != 20; ++i) { 097 byte[] value = Bytes.toBytes(i); 098 puts.forEach(p -> { 099 p.addColumn(FAMILY_1, value, value); 100 p.addColumn(FAMILY_2, value, value); 101 }); 102 } 103 t.put(puts); 104 } 105 assertFalse(getRegionInfo().isEmpty()); 106 assertTrue(getRegionInfo().stream().allMatch(r -> r.getMemStoreDataSize() != 0)); 107 } 108 109 @After 110 public void tearDown() throws Exception { 111 for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) { 112 LOG.info("Tear down, remove table=" + htd.getTableName()); 113 TEST_UTIL.deleteTable(htd.getTableName()); 114 } 115 } 116 117 @Test 118 public void testFlushTable() throws Exception { 119 try (Admin admin = TEST_UTIL.getAdmin()) { 120 admin.flush(tableName); 121 assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); 122 } 123 } 124 125 @Test 126 public void testFlushTableFamily() throws Exception { 127 try (Admin admin = TEST_UTIL.getAdmin()) { 128 long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize(); 129 admin.flush(tableName, FAMILY_1); 130 assertFalse( 131 getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2)); 132 } 133 } 134 135 @Test 136 public void testAsyncFlushTable() throws Exception { 137 AsyncAdmin admin = asyncConn.getAdmin(); 138 admin.flush(tableName).get(); 139 assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); 140 } 141 142 @Test 143 public void testAsyncFlushTableFamily() throws Exception { 144 AsyncAdmin admin = asyncConn.getAdmin(); 145 long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize(); 146 admin.flush(tableName, FAMILY_1).get(); 147 assertFalse( 148 getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2)); 149 } 150 151 @Test 152 public void testFlushRegion() throws Exception { 153 try (Admin admin = TEST_UTIL.getAdmin()) { 154 for (HRegion r : getRegionInfo()) { 155 admin.flushRegion(r.getRegionInfo().getRegionName()); 156 TimeUnit.SECONDS.sleep(1); 157 assertEquals(0, r.getMemStoreDataSize()); 158 } 159 } 160 } 161 162 @Test 163 public void testFlushRegionFamily() throws Exception { 164 try (Admin admin = TEST_UTIL.getAdmin()) { 165 for (HRegion r : getRegionInfo()) { 166 long sizeBeforeFlush = r.getMemStoreDataSize(); 167 admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1); 168 TimeUnit.SECONDS.sleep(1); 169 assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize()); 170 } 171 } 172 } 173 174 @Test 175 public void testAsyncFlushRegion() throws Exception { 176 AsyncAdmin admin = asyncConn.getAdmin(); 177 for (HRegion r : getRegionInfo()) { 178 admin.flushRegion(r.getRegionInfo().getRegionName()).get(); 179 TimeUnit.SECONDS.sleep(1); 180 assertEquals(0, r.getMemStoreDataSize()); 181 } 182 } 183 184 @Test 185 public void testAsyncFlushRegionFamily() throws Exception { 186 AsyncAdmin admin = asyncConn.getAdmin(); 187 for (HRegion r : getRegionInfo()) { 188 long sizeBeforeFlush = r.getMemStoreDataSize(); 189 admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1).get(); 190 TimeUnit.SECONDS.sleep(1); 191 assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize()); 192 } 193 } 194 195 @Test 196 public void testAsyncFlushTableWithNonExistingFamilies() throws IOException { 197 AsyncAdmin admin = asyncConn.getAdmin(); 198 List<byte[]> families = new ArrayList<>(); 199 families.add(FAMILY_1); 200 families.add(FAMILY_2); 201 families.add(Bytes.toBytes("non_family01")); 202 families.add(Bytes.toBytes("non_family02")); 203 CompletableFuture<Void> future = CompletableFuture.allOf(admin.flush(tableName, families)); 204 assertThrows(NoSuchColumnFamilyException.class, () -> FutureUtils.get(future)); 205 } 206 207 @Test 208 public void testAsyncFlushRegionWithNonExistingFamily() throws IOException { 209 AsyncAdmin admin = asyncConn.getAdmin(); 210 List<HRegion> regions = getRegionInfo(); 211 assertNotNull(regions); 212 assertTrue(regions.size() > 0); 213 HRegion region = regions.get(0); 214 CompletableFuture<Void> future = CompletableFuture.allOf(admin 215 .flushRegion(region.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes("non_family"))); 216 assertThrows(NoSuchColumnFamilyException.class, () -> FutureUtils.get(future)); 217 } 218 219 @Test 220 public void testFlushRegionServer() throws Exception { 221 try (Admin admin = TEST_UTIL.getAdmin()) { 222 for (HRegionServer rs : TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 223 .map(JVMClusterUtil.RegionServerThread::getRegionServer).collect(Collectors.toList())) { 224 admin.flushRegionServer(rs.getServerName()); 225 assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); 226 } 227 } 228 } 229 230 @Test 231 public void testAsyncFlushRegionServer() throws Exception { 232 AsyncAdmin admin = asyncConn.getAdmin(); 233 for (HRegionServer rs : TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 234 .map(JVMClusterUtil.RegionServerThread::getRegionServer).collect(Collectors.toList())) { 235 admin.flushRegionServer(rs.getServerName()).get(); 236 assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); 237 } 238 } 239 240 private List<HRegion> getRegionInfo() { 241 return TEST_UTIL.getHBaseCluster().getRegions(tableName); 242 } 243 244 private List<HRegion> getRegionInfo(HRegionServer rs) { 245 return rs.getRegions().stream() 246 .filter(v -> v.getTableDescriptor().getTableName().equals(tableName)) 247 .collect(Collectors.toList()); 248 } 249}