001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertThrows;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.List;
030import java.util.concurrent.CompletableFuture;
031import java.util.concurrent.TimeUnit;
032import java.util.stream.Collectors;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.regionserver.HRegion;
037import org.apache.hadoop.hbase.regionserver.HRegionServer;
038import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
039import org.apache.hadoop.hbase.testclassification.ClientTests;
040import org.apache.hadoop.hbase.testclassification.MediumTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.FutureUtils;
043import org.apache.hadoop.hbase.util.JVMClusterUtil;
044import org.junit.After;
045import org.junit.AfterClass;
046import org.junit.Before;
047import org.junit.BeforeClass;
048import org.junit.ClassRule;
049import org.junit.Rule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.junit.rules.TestName;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
057
058@Category({ MediumTests.class, ClientTests.class })
059public class TestFlushFromClient {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestFlushFromClient.class);
064
065  private static final Logger LOG = LoggerFactory.getLogger(TestFlushFromClient.class);
066  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
067  private static AsyncConnection asyncConn;
068  private static final byte[][] SPLITS = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("7") };
069  private static final List<byte[]> ROWS =
070    Arrays.asList(Bytes.toBytes("1"), Bytes.toBytes("4"), Bytes.toBytes("8"));
071  private static final byte[] FAMILY_1 = Bytes.toBytes("f1");
072  private static final byte[] FAMILY_2 = Bytes.toBytes("f2");
073  public static final byte[][] FAMILIES = { FAMILY_1, FAMILY_2 };
074  @Rule
075  public TestName name = new TestName();
076
077  public TableName tableName;
078
079  @BeforeClass
080  public static void setUpBeforeClass() throws Exception {
081    TEST_UTIL.startMiniCluster(ROWS.size());
082    asyncConn = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
083  }
084
085  @AfterClass
086  public static void tearDownAfterClass() throws Exception {
087    Closeables.close(asyncConn, true);
088    TEST_UTIL.shutdownMiniCluster();
089  }
090
091  @Before
092  public void setUp() throws Exception {
093    tableName = TableName.valueOf(name.getMethodName());
094    try (Table t = TEST_UTIL.createTable(tableName, FAMILIES, SPLITS)) {
095      List<Put> puts = ROWS.stream().map(r -> new Put(r)).collect(Collectors.toList());
096      for (int i = 0; i != 20; ++i) {
097        byte[] value = Bytes.toBytes(i);
098        puts.forEach(p -> {
099          p.addColumn(FAMILY_1, value, value);
100          p.addColumn(FAMILY_2, value, value);
101        });
102      }
103      t.put(puts);
104    }
105    assertFalse(getRegionInfo().isEmpty());
106    assertTrue(getRegionInfo().stream().allMatch(r -> r.getMemStoreDataSize() != 0));
107  }
108
109  @After
110  public void tearDown() throws Exception {
111    for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
112      LOG.info("Tear down, remove table=" + htd.getTableName());
113      TEST_UTIL.deleteTable(htd.getTableName());
114    }
115  }
116
117  @Test
118  public void testFlushTable() throws Exception {
119    try (Admin admin = TEST_UTIL.getAdmin()) {
120      admin.flush(tableName);
121      assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
122    }
123  }
124
125  @Test
126  public void testFlushTableFamily() throws Exception {
127    try (Admin admin = TEST_UTIL.getAdmin()) {
128      long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize();
129      admin.flush(tableName, FAMILY_1);
130      assertFalse(
131        getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2));
132    }
133  }
134
135  @Test
136  public void testAsyncFlushTable() throws Exception {
137    AsyncAdmin admin = asyncConn.getAdmin();
138    admin.flush(tableName).get();
139    assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
140  }
141
142  @Test
143  public void testAsyncFlushTableFamily() throws Exception {
144    AsyncAdmin admin = asyncConn.getAdmin();
145    long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize();
146    admin.flush(tableName, FAMILY_1).get();
147    assertFalse(
148      getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2));
149  }
150
151  @Test
152  public void testFlushRegion() throws Exception {
153    try (Admin admin = TEST_UTIL.getAdmin()) {
154      for (HRegion r : getRegionInfo()) {
155        admin.flushRegion(r.getRegionInfo().getRegionName());
156        TimeUnit.SECONDS.sleep(1);
157        assertEquals(0, r.getMemStoreDataSize());
158      }
159    }
160  }
161
162  @Test
163  public void testFlushRegionFamily() throws Exception {
164    try (Admin admin = TEST_UTIL.getAdmin()) {
165      for (HRegion r : getRegionInfo()) {
166        long sizeBeforeFlush = r.getMemStoreDataSize();
167        admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1);
168        TimeUnit.SECONDS.sleep(1);
169        assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize());
170      }
171    }
172  }
173
174  @Test
175  public void testAsyncFlushRegion() throws Exception {
176    AsyncAdmin admin = asyncConn.getAdmin();
177    for (HRegion r : getRegionInfo()) {
178      admin.flushRegion(r.getRegionInfo().getRegionName()).get();
179      TimeUnit.SECONDS.sleep(1);
180      assertEquals(0, r.getMemStoreDataSize());
181    }
182  }
183
184  @Test
185  public void testAsyncFlushRegionFamily() throws Exception {
186    AsyncAdmin admin = asyncConn.getAdmin();
187    for (HRegion r : getRegionInfo()) {
188      long sizeBeforeFlush = r.getMemStoreDataSize();
189      admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1).get();
190      TimeUnit.SECONDS.sleep(1);
191      assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize());
192    }
193  }
194
195  @Test
196  public void testAsyncFlushTableWithNonExistingFamilies() throws IOException {
197    AsyncAdmin admin = asyncConn.getAdmin();
198    List<byte[]> families = new ArrayList<>();
199    families.add(FAMILY_1);
200    families.add(FAMILY_2);
201    families.add(Bytes.toBytes("non_family01"));
202    families.add(Bytes.toBytes("non_family02"));
203    CompletableFuture<Void> future = CompletableFuture.allOf(admin.flush(tableName, families));
204    assertThrows(NoSuchColumnFamilyException.class, () -> FutureUtils.get(future));
205  }
206
207  @Test
208  public void testAsyncFlushRegionWithNonExistingFamily() throws IOException {
209    AsyncAdmin admin = asyncConn.getAdmin();
210    List<HRegion> regions = getRegionInfo();
211    assertNotNull(regions);
212    assertTrue(regions.size() > 0);
213    HRegion region = regions.get(0);
214    CompletableFuture<Void> future = CompletableFuture.allOf(admin
215      .flushRegion(region.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes("non_family")));
216    assertThrows(NoSuchColumnFamilyException.class, () -> FutureUtils.get(future));
217  }
218
219  @Test
220  public void testFlushRegionServer() throws Exception {
221    try (Admin admin = TEST_UTIL.getAdmin()) {
222      for (HRegionServer rs : TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
223        .map(JVMClusterUtil.RegionServerThread::getRegionServer).collect(Collectors.toList())) {
224        admin.flushRegionServer(rs.getServerName());
225        assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
226      }
227    }
228  }
229
230  @Test
231  public void testAsyncFlushRegionServer() throws Exception {
232    AsyncAdmin admin = asyncConn.getAdmin();
233    for (HRegionServer rs : TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
234      .map(JVMClusterUtil.RegionServerThread::getRegionServer).collect(Collectors.toList())) {
235      admin.flushRegionServer(rs.getServerName()).get();
236      assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
237    }
238  }
239
240  private List<HRegion> getRegionInfo() {
241    return TEST_UTIL.getHBaseCluster().getRegions(tableName);
242  }
243
244  private List<HRegion> getRegionInfo(HRegionServer rs) {
245    return rs.getRegions().stream()
246      .filter(v -> v.getTableDescriptor().getTableName().equals(tableName))
247      .collect(Collectors.toList());
248  }
249}