001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.util.concurrent.CountDownLatch; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicLong; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 036import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy; 037import org.apache.hadoop.hbase.client.backoff.ServerStatistics; 038import org.apache.hadoop.hbase.regionserver.HRegionServer; 039import org.apache.hadoop.hbase.regionserver.Region; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.junit.AfterClass; 043import org.junit.BeforeClass; 044import org.junit.Test; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * Test that we can actually send and use region metrics to slowdown client writes 050 */ 051public abstract class ClientPushbackTestBase { 052 053 private static final Logger LOG = LoggerFactory.getLogger(ClientPushbackTestBase.class); 054 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 055 056 protected static final TableName tableName = TableName.valueOf("client-pushback"); 057 private static final byte[] family = Bytes.toBytes("f"); 058 private static final byte[] qualifier = Bytes.toBytes("q"); 059 private static final long flushSizeBytes = 512; 060 061 @BeforeClass 062 public static void setupCluster() throws Exception { 063 Configuration conf = UTIL.getConfiguration(); 064 // enable backpressure 065 conf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true); 066 // use the exponential backoff policy 067 conf.setClass(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, ExponentialClientBackoffPolicy.class, 068 ClientBackoffPolicy.class); 069 // turn the memstore size way down so we don't need to write a lot to see changes in memstore 070 // load 071 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes); 072 // ensure we block the flushes when we are double that flushsize 073 conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 074 HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER); 075 conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true); 076 UTIL.startMiniCluster(1); 077 UTIL.createTable(tableName, family); 078 } 079 080 @AfterClass 081 public static void cleanupCluster() throws Exception { 082 UTIL.shutdownMiniCluster(); 083 } 084 085 protected abstract ClientBackoffPolicy getBackoffPolicy() throws IOException; 086 087 protected abstract ServerStatisticTracker getStatisticsTracker() throws IOException; 088 089 protected abstract MetricsConnection getConnectionMetrics() throws IOException; 090 091 protected abstract void mutate(Put put) throws IOException; 092 093 protected abstract void mutate(Put put, AtomicLong endTime, CountDownLatch latch) 094 throws IOException; 095 096 protected abstract void mutateRow(RowMutations mutations) throws IOException; 097 098 @Test 099 public void testClientTracksServerPushback() throws Exception { 100 HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0); 101 Region region = rs.getRegions(tableName).get(0); 102 103 LOG.debug("Writing some data to " + tableName); 104 // write some data 105 Put p = new Put(Bytes.toBytes("row")); 106 p.addColumn(family, qualifier, Bytes.toBytes("value1")); 107 mutate(p); 108 109 // get the current load on RS. Hopefully memstore isn't flushed since we wrote the the data 110 int load = (int) ((region.getMemStoreHeapSize() * 100) / flushSizeBytes); 111 LOG.debug("Done writing some data to " + tableName); 112 113 // get the stats for the region hosting our table 114 ClientBackoffPolicy backoffPolicy = getBackoffPolicy(); 115 assertTrue("Backoff policy is not correctly configured", 116 backoffPolicy instanceof ExponentialClientBackoffPolicy); 117 118 ServerStatisticTracker stats = getStatisticsTracker(); 119 assertNotNull("No stats configured for the client!", stats); 120 // get the names so we can query the stats 121 ServerName server = rs.getServerName(); 122 byte[] regionName = region.getRegionInfo().getRegionName(); 123 124 // check to see we found some load on the memstore 125 ServerStatistics serverStats = stats.getStats(server); 126 ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName); 127 assertEquals("We did not find some load on the memstore", load, 128 regionStats.getMemStoreLoadPercent()); 129 // check that the load reported produces a nonzero delay 130 long backoffTime = backoffPolicy.getBackoffTime(server, regionName, serverStats); 131 assertNotEquals("Reported load does not produce a backoff", 0, backoffTime); 132 LOG.debug("Backoff calculated for " + region.getRegionInfo().getRegionNameAsString() + " @ " 133 + server + " is " + backoffTime); 134 135 CountDownLatch latch = new CountDownLatch(1); 136 AtomicLong endTime = new AtomicLong(); 137 long startTime = EnvironmentEdgeManager.currentTime(); 138 mutate(p, endTime, latch); 139 // Currently the ExponentialClientBackoffPolicy under these test conditions 140 // produces a backoffTime of 151 milliseconds. This is long enough so the 141 // wait and related checks below are reasonable. Revisit if the backoff 142 // time reported by above debug logging has significantly deviated. 143 MetricsConnection metrics = getConnectionMetrics(); 144 String name = server.getServerName() + "," + Bytes.toStringBinary(regionName); 145 MetricsConnection.RegionStats rsStats = metrics.getServerStats().get(server).get(regionName); 146 assertEquals(name, rsStats.name); 147 assertEquals(rsStats.heapOccupancyHist.getSnapshot().getMean(), 148 (double) regionStats.getHeapOccupancyPercent(), 0.1); 149 assertEquals(rsStats.memstoreLoadHist.getSnapshot().getMean(), 150 (double) regionStats.getMemStoreLoadPercent(), 0.1); 151 152 MetricsConnection.RunnerStats runnerStats = metrics.getRunnerStats(); 153 154 assertEquals(1, runnerStats.delayRunners.getCount()); 155 assertEquals(1, runnerStats.normalRunners.getCount()); 156 assertEquals("", runnerStats.delayIntevalHist.getSnapshot().getMean(), (double) backoffTime, 157 0.1); 158 159 latch.await(backoffTime * 2, TimeUnit.MILLISECONDS); 160 assertNotEquals("AsyncProcess did not submit the work time", 0, endTime.get()); 161 assertTrue("AsyncProcess did not delay long enough", endTime.get() - startTime >= backoffTime); 162 } 163 164 @Test 165 public void testMutateRowStats() throws IOException { 166 HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0); 167 Region region = rs.getRegions(tableName).get(0); 168 169 RowMutations mutations = new RowMutations(Bytes.toBytes("row")); 170 Put p = new Put(Bytes.toBytes("row")); 171 p.addColumn(family, qualifier, Bytes.toBytes("value2")); 172 mutations.add(p); 173 mutateRow(mutations); 174 175 ServerStatisticTracker stats = getStatisticsTracker(); 176 assertNotNull("No stats configured for the client!", stats); 177 // get the names so we can query the stats 178 ServerName server = rs.getServerName(); 179 byte[] regionName = region.getRegionInfo().getRegionName(); 180 181 // check to see we found some load on the memstore 182 ServerStatistics serverStats = stats.getStats(server); 183 ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName); 184 185 assertNotNull(regionStats); 186 assertTrue(regionStats.getMemStoreLoadPercent() > 0); 187 } 188}