001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.Optional;
023import java.util.concurrent.atomic.AtomicLong;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HRegionLocation;
026import org.apache.hadoop.hbase.RegionLocations;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.ZooKeeperConnectionException;
030import org.apache.hadoop.hbase.coprocessor.ObserverContext;
031import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
032import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
033import org.apache.hadoop.hbase.coprocessor.RegionObserver;
034import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
035import org.apache.hadoop.hbase.util.Threads;
036import org.mockito.Mockito;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
040
041/**
042 * {@link Connection} testing utility.
043 */
044public class HConnectionTestingUtility {
045  /*
046   * Not part of {@link HBaseTestingUtility} because this class is not in same package as {@link
047   * ClusterConnection}. Would have to reveal ugly {@link ConnectionImplementation} innards to
048   * HBaseTestingUtility to give it access.
049   */
050  /**
051   * <<<<<<< HEAD Get a Mocked {@link ClusterConnection} that goes with the passed <code>conf</code>
052   * configuration instance. Minimally the mock will return &lt;code>conf&lt;/conf> when
053   * {@link ClusterConnection#getConfiguration()} is invoked. Be sure to shutdown the connection
054   * when done by calling {@link Connection#close()} else it will stick around; this is probably not
055   * what you want. ======= Get a Mocked {@link Connection} that goes with the passed
056   * <code>conf</code> configuration instance. Minimally the mock will return
057   * &lt;code>conf&lt;/conf> when {@link Connection#getConfiguration()} is invoked. Be sure to
058   * shutdown the connection when done by calling {@link Connection#close()} else it will stick
059   * around; this is probably not what you want. >>>>>>> fabf2b8282... HBASE-22572 Javadoc
060   * Warnings: @link reference not found (#306)
061   * @param conf configuration
062   * @return ClusterConnection object for <code>conf</code>
063   */
064  public static ClusterConnection getMockedConnection(final Configuration conf)
065    throws ZooKeeperConnectionException {
066    ConnectionConfiguration connectionConfig = new ConnectionConfiguration(conf);
067    ConnectionImplementation connection = Mockito.mock(ConnectionImplementation.class);
068    Mockito.when(connection.getConfiguration()).thenReturn(conf);
069    Mockito.when(connection.getConnectionConfiguration()).thenReturn(connectionConfig);
070    Mockito.when(connection.getRpcControllerFactory())
071      .thenReturn(Mockito.mock(RpcControllerFactory.class));
072    // we need a real retrying caller
073    RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf, connectionConfig);
074    Mockito.when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory);
075    return connection;
076  }
077
078  /**
079   * Calls {@link #getMockedConnection(Configuration)} and then mocks a few more of the popular
080   * {@link ClusterConnection} methods so they do 'normal' operation (see return doc below for
081   * list). Be sure to shutdown the connection when done by calling {@link Connection#close()} else
082   * it will stick around; this is probably not what you want.
083   * @param conf   Configuration to use
084   * @param admin  An AdminProtocol; can be null but is usually itself a mock.
085   * @param client A ClientProtocol; can be null but is usually itself a mock.
086   * @param sn     ServerName to include in the region location returned by this
087   *               <code>connection</code>
088   * @param hri    RegionInfo to include in the location returned when getRegionLocator is called on
089   *               the mocked connection
090   * @return Mock up a connection that returns a {@link Configuration} when
091   *         {@link ClusterConnection#getConfiguration()} is called, a 'location' when
092   *         {@link ClusterConnection#getRegionLocation(org.apache.hadoop.hbase.TableName, byte[], boolean)}
093   *         is called, and that returns the passed
094   *         {@link AdminProtos.AdminService.BlockingInterface} instance when
095   *         {@link ClusterConnection#getAdmin(ServerName)} is called, returns the passed
096   *         {@link ClientProtos.ClientService.BlockingInterface} instance when
097   *         {@link ClusterConnection#getClient(ServerName)} is called (Be sure to call
098   *         {@link Connection#close()} when done with this mocked Connection.
099   */
100  public static ClusterConnection getMockedConnectionAndDecorate(final Configuration conf,
101    final AdminProtos.AdminService.BlockingInterface admin,
102    final ClientProtos.ClientService.BlockingInterface client, final ServerName sn,
103    final RegionInfo hri) throws IOException {
104    ConnectionImplementation c = Mockito.mock(ConnectionImplementation.class);
105    Mockito.when(c.getConfiguration()).thenReturn(conf);
106    ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration(conf);
107    Mockito.when(c.getConnectionConfiguration()).thenReturn(connectionConfiguration);
108    Mockito.doNothing().when(c).close();
109    // Make it so we return a particular location when asked.
110    final HRegionLocation loc = new HRegionLocation(hri, sn);
111    Mockito.when(
112      c.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(), Mockito.anyBoolean()))
113      .thenReturn(loc);
114    Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).thenReturn(loc);
115    Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any(),
116      Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt()))
117      .thenReturn(new RegionLocations(loc));
118    if (admin != null) {
119      // If a call to getAdmin, return this implementation.
120      Mockito.when(c.getAdmin(Mockito.any())).thenReturn(admin);
121    }
122    if (client != null) {
123      // If a call to getClient, return this client.
124      Mockito.when(c.getClient(Mockito.any())).thenReturn(client);
125    }
126    NonceGenerator ng = Mockito.mock(NonceGenerator.class);
127    Mockito.when(c.getNonceGenerator()).thenReturn(ng);
128    AsyncProcess asyncProcess = new AsyncProcess(c, conf,
129      RpcRetryingCallerFactory.instantiate(conf, connectionConfiguration, c.getConnectionMetrics()),
130      RpcControllerFactory.instantiate(conf));
131    Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess);
132    Mockito.when(c.getNewRpcRetryingCallerFactory(conf))
133      .thenReturn(RpcRetryingCallerFactory.instantiate(conf, connectionConfiguration,
134        RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, null));
135    Mockito.when(c.getRpcControllerFactory()).thenReturn(Mockito.mock(RpcControllerFactory.class));
136    Table t = Mockito.mock(Table.class);
137    Mockito.when(c.getTable((TableName) Mockito.any())).thenReturn(t);
138    ResultScanner rs = Mockito.mock(ResultScanner.class);
139    Mockito.when(t.getScanner((Scan) Mockito.any())).thenReturn(rs);
140    return c;
141  }
142
143  /**
144   * Get a Mockito spied-upon {@link ClusterConnection} that goes with the passed <code>conf</code>
145   * configuration instance. Be sure to shutdown the connection when done by calling
146   * {@link Connection#close()} else it will stick around; this is probably not what you want.
147   * @param conf configuration
148   * @return ClusterConnection object for <code>conf</code> [Dead link]: See also
149   *         {http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html#spy(T)}
150   */
151  public static ClusterConnection getSpiedConnection(final Configuration conf) throws IOException {
152    ConnectionImplementation connection =
153      Mockito.spy(new ConnectionImplementation(conf, null, null, Collections.emptyMap()));
154    return connection;
155  }
156
157  /**
158   * This coproceesor sleep 2s at first increment/append rpc call.
159   */
160  public static class SleepAtFirstRpcCall implements RegionCoprocessor, RegionObserver {
161    static final AtomicLong ct = new AtomicLong(0);
162    static final String SLEEP_TIME_CONF_KEY = "hbase.coprocessor.SleepAtFirstRpcCall.sleepTime";
163    static final long DEFAULT_SLEEP_TIME = 2000;
164    static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME);
165
166    @Override
167    public Optional<RegionObserver> getRegionObserver() {
168      return Optional.of(this);
169    }
170
171    public SleepAtFirstRpcCall() {
172    }
173
174    @Override
175    public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
176      RegionCoprocessorEnvironment env = c.getEnvironment();
177      Configuration conf = env.getConfiguration();
178      sleepTime.set(conf.getLong(SLEEP_TIME_CONF_KEY, DEFAULT_SLEEP_TIME));
179    }
180
181    @Override
182    public Result postIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
183      final Increment increment, final Result result) throws IOException {
184      if (ct.incrementAndGet() == 1) {
185        Threads.sleep(sleepTime.get());
186      }
187      return result;
188    }
189
190    @Override
191    public Result postAppend(final ObserverContext<RegionCoprocessorEnvironment> e,
192      final Append append, final Result result) throws IOException {
193      if (ct.incrementAndGet() == 1) {
194        Threads.sleep(sleepTime.get());
195      }
196      return result;
197    }
198  }
199}