001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.security.PrivilegedAction;
022import java.util.ArrayList;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.hbase.client.RegionInfoBuilder;
029import org.apache.hadoop.hbase.client.RegionReplicaUtil;
030import org.apache.hadoop.hbase.master.HMaster;
031import org.apache.hadoop.hbase.regionserver.HRegion;
032import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
033import org.apache.hadoop.hbase.regionserver.HRegionServer;
034import org.apache.hadoop.hbase.regionserver.Region;
035import org.apache.hadoop.hbase.security.User;
036import org.apache.hadoop.hbase.test.MetricsAssertHelper;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.JVMClusterUtil;
039import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
041import org.apache.hadoop.hbase.util.Threads;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.apache.yetus.audience.InterfaceStability;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
048
049/**
050 * This class creates a single process HBase cluster. each server. The master uses the 'default'
051 * FileSystem. The RegionServers, if we are running on DistributedFilesystem, create a FileSystem
052 * instance each and will close down their instance on the way out.
053 */
054@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX)
055@InterfaceStability.Evolving
056public class SingleProcessHBaseCluster extends HBaseClusterInterface {
057  private static final Logger LOG =
058    LoggerFactory.getLogger(SingleProcessHBaseCluster.class.getName());
059  public LocalHBaseCluster hbaseCluster;
060  private static int index;
061
062  /**
063   * Start a MiniHBaseCluster.
064   * @param conf             Configuration to be used for cluster
065   * @param numRegionServers initial number of region servers to start.
066   */
067  public SingleProcessHBaseCluster(Configuration conf, int numRegionServers)
068    throws IOException, InterruptedException {
069    this(conf, 1, numRegionServers);
070  }
071
072  /**
073   * Start a MiniHBaseCluster.
074   * @param conf             Configuration to be used for cluster
075   * @param numMasters       initial number of masters to start.
076   * @param numRegionServers initial number of region servers to start.
077   */
078  public SingleProcessHBaseCluster(Configuration conf, int numMasters, int numRegionServers)
079    throws IOException, InterruptedException {
080    this(conf, numMasters, numRegionServers, null, null);
081  }
082
083  /**
084   * Start a MiniHBaseCluster.
085   * @param conf             Configuration to be used for cluster
086   * @param numMasters       initial number of masters to start.
087   * @param numRegionServers initial number of region servers to start.
088   */
089  public SingleProcessHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
090    Class<? extends HMaster> masterClass,
091    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
092    throws IOException, InterruptedException {
093    this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass);
094  }
095
096  /**
097   * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster
098   *                restart where for sure the regionservers come up on same address+port (but just
099   *                with different startcode); by default mini hbase clusters choose new arbitrary
100   *                ports on each cluster start.
101   */
102  public SingleProcessHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters,
103    int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
104    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
105    throws IOException, InterruptedException {
106    super(conf);
107
108    // Hadoop 2
109    CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
110
111    init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass,
112      regionserverClass);
113    this.initialClusterStatus = getClusterMetrics();
114  }
115
116  public Configuration getConfiguration() {
117    return this.conf;
118  }
119
120  /**
121   * Subclass so can get at protected methods (none at moment). Also, creates a FileSystem instance
122   * per instantiation. Adds a shutdown own FileSystem on the way out. Shuts down own Filesystem
123   * only, not All filesystems as the FileSystem system exit hook does.
124   */
125  public static class MiniHBaseClusterRegionServer extends HRegionServer {
126    private Thread shutdownThread = null;
127    private User user = null;
128    /**
129     * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any
130     * restarted instances of the same server will have different ServerName and will not coincide
131     * with past dead ones. So there's no need to cleanup this list.
132     */
133    static Set<ServerName> killedServers = new HashSet<>();
134
135    public MiniHBaseClusterRegionServer(Configuration conf)
136      throws IOException, InterruptedException {
137      super(conf);
138      this.user = User.getCurrent();
139    }
140
141    @Override
142    protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
143      throws IOException {
144      super.handleReportForDutyResponse(c);
145      // Run this thread to shutdown our filesystem on way out.
146      this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
147    }
148
149    @Override
150    public void run() {
151      try {
152        this.user.runAs(new PrivilegedAction<Object>() {
153          @Override
154          public Object run() {
155            runRegionServer();
156            return null;
157          }
158        });
159      } catch (Throwable t) {
160        LOG.error("Exception in run", t);
161      } finally {
162        // Run this on the way out.
163        if (this.shutdownThread != null) {
164          this.shutdownThread.start();
165          Threads.shutdown(this.shutdownThread, 30000);
166        }
167      }
168    }
169
170    private void runRegionServer() {
171      super.run();
172    }
173
174    @Override
175    protected void kill() {
176      killedServers.add(getServerName());
177      super.kill();
178    }
179
180    @Override
181    public void abort(final String reason, final Throwable cause) {
182      this.user.runAs(new PrivilegedAction<Object>() {
183        @Override
184        public Object run() {
185          abortRegionServer(reason, cause);
186          return null;
187        }
188      });
189    }
190
191    private void abortRegionServer(String reason, Throwable cause) {
192      super.abort(reason, cause);
193    }
194  }
195
196  /**
197   * Alternate shutdown hook. Just shuts down the passed fs, not all as default filesystem hook
198   * does.
199   */
200  static class SingleFileSystemShutdownThread extends Thread {
201    private final FileSystem fs;
202
203    SingleFileSystemShutdownThread(final FileSystem fs) {
204      super("Shutdown of " + fs);
205      this.fs = fs;
206    }
207
208    @Override
209    public void run() {
210      try {
211        LOG.info("Hook closing fs=" + this.fs);
212        this.fs.close();
213      } catch (IOException e) {
214        LOG.warn("Running hook", e);
215      }
216    }
217  }
218
219  private void init(final int nMasterNodes, final int numAlwaysStandByMasters,
220    final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
221    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
222    throws IOException, InterruptedException {
223    try {
224      if (masterClass == null) {
225        masterClass = HMaster.class;
226      }
227      if (regionserverClass == null) {
228        regionserverClass = SingleProcessHBaseCluster.MiniHBaseClusterRegionServer.class;
229      }
230
231      // start up a LocalHBaseCluster
232      hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, numAlwaysStandByMasters, 0,
233        masterClass, regionserverClass);
234
235      // manually add the regionservers as other users
236      for (int i = 0; i < nRegionNodes; i++) {
237        Configuration rsConf = HBaseConfiguration.create(conf);
238        if (rsPorts != null) {
239          rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i));
240        }
241        User user = HBaseTestingUtil.getDifferentUser(rsConf, ".hfs." + index++);
242        hbaseCluster.addRegionServer(rsConf, i, user);
243      }
244
245      hbaseCluster.startup();
246    } catch (IOException e) {
247      shutdown();
248      throw e;
249    } catch (Throwable t) {
250      LOG.error("Error starting cluster", t);
251      shutdown();
252      throw new IOException("Shutting down", t);
253    }
254  }
255
256  @Override
257  public void startRegionServer(String hostname, int port) throws IOException {
258    final Configuration newConf = HBaseConfiguration.create(conf);
259    newConf.setInt(HConstants.REGIONSERVER_PORT, port);
260    startRegionServer(newConf);
261  }
262
263  @Override
264  public void killRegionServer(ServerName serverName) throws IOException {
265    HRegionServer server = getRegionServer(getRegionServerIndex(serverName));
266    if (server instanceof MiniHBaseClusterRegionServer) {
267      LOG.info("Killing " + server.toString());
268      ((MiniHBaseClusterRegionServer) server).kill();
269    } else {
270      abortRegionServer(getRegionServerIndex(serverName));
271    }
272  }
273
274  @Override
275  public boolean isKilledRS(ServerName serverName) {
276    return MiniHBaseClusterRegionServer.killedServers.contains(serverName);
277  }
278
279  @Override
280  public void stopRegionServer(ServerName serverName) throws IOException {
281    stopRegionServer(getRegionServerIndex(serverName));
282  }
283
284  @Override
285  public void suspendRegionServer(ServerName serverName) throws IOException {
286    suspendRegionServer(getRegionServerIndex(serverName));
287  }
288
289  @Override
290  public void resumeRegionServer(ServerName serverName) throws IOException {
291    resumeRegionServer(getRegionServerIndex(serverName));
292  }
293
294  @Override
295  public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
296    // ignore timeout for now
297    waitOnRegionServer(getRegionServerIndex(serverName));
298  }
299
300  @Override
301  public void waitForRegionServerToSuspend(ServerName serverName, long timeout) throws IOException {
302    LOG.warn("Waiting for regionserver to suspend on mini cluster is not supported");
303  }
304
305  @Override
306  public void waitForRegionServerToResume(ServerName serverName, long timeout) throws IOException {
307    LOG.warn("Waiting for regionserver to resume on mini cluster is not supported");
308  }
309
310  @Override
311  public void startZkNode(String hostname, int port) throws IOException {
312    LOG.warn("Starting zookeeper nodes on mini cluster is not supported");
313  }
314
315  @Override
316  public void killZkNode(ServerName serverName) throws IOException {
317    LOG.warn("Aborting zookeeper nodes on mini cluster is not supported");
318  }
319
320  @Override
321  public void stopZkNode(ServerName serverName) throws IOException {
322    LOG.warn("Stopping zookeeper nodes on mini cluster is not supported");
323  }
324
325  @Override
326  public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
327    LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported");
328  }
329
330  @Override
331  public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
332    LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported");
333  }
334
335  @Override
336  public void startDataNode(ServerName serverName) throws IOException {
337    LOG.warn("Starting datanodes on mini cluster is not supported");
338  }
339
340  @Override
341  public void killDataNode(ServerName serverName) throws IOException {
342    LOG.warn("Aborting datanodes on mini cluster is not supported");
343  }
344
345  @Override
346  public void stopDataNode(ServerName serverName) throws IOException {
347    LOG.warn("Stopping datanodes on mini cluster is not supported");
348  }
349
350  @Override
351  public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
352    LOG.warn("Waiting for datanodes to start on mini cluster is not supported");
353  }
354
355  @Override
356  public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
357    LOG.warn("Waiting for datanodes to stop on mini cluster is not supported");
358  }
359
360  @Override
361  public void startNameNode(ServerName serverName) throws IOException {
362    LOG.warn("Starting namenodes on mini cluster is not supported");
363  }
364
365  @Override
366  public void killNameNode(ServerName serverName) throws IOException {
367    LOG.warn("Aborting namenodes on mini cluster is not supported");
368  }
369
370  @Override
371  public void stopNameNode(ServerName serverName) throws IOException {
372    LOG.warn("Stopping namenodes on mini cluster is not supported");
373  }
374
375  @Override
376  public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException {
377    LOG.warn("Waiting for namenodes to start on mini cluster is not supported");
378  }
379
380  @Override
381  public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException {
382    LOG.warn("Waiting for namenodes to stop on mini cluster is not supported");
383  }
384
385  @Override
386  public void startJournalNode(ServerName serverName) {
387    LOG.warn("Starting journalnodes on mini cluster is not supported");
388  }
389
390  @Override
391  public void killJournalNode(ServerName serverName) {
392    LOG.warn("Aborting journalnodes on mini cluster is not supported");
393  }
394
395  @Override
396  public void stopJournalNode(ServerName serverName) {
397    LOG.warn("Stopping journalnodes on mini cluster is not supported");
398  }
399
400  @Override
401  public void waitForJournalNodeToStart(ServerName serverName, long timeout) {
402    LOG.warn("Waiting for journalnodes to start on mini cluster is not supported");
403  }
404
405  @Override
406  public void waitForJournalNodeToStop(ServerName serverName, long timeout) {
407    LOG.warn("Waiting for journalnodes to stop on mini cluster is not supported");
408  }
409
410  @Override
411  public void startMaster(String hostname, int port) throws IOException {
412    this.startMaster();
413  }
414
415  @Override
416  public void killMaster(ServerName serverName) throws IOException {
417    abortMaster(getMasterIndex(serverName));
418  }
419
420  @Override
421  public void stopMaster(ServerName serverName) throws IOException {
422    stopMaster(getMasterIndex(serverName));
423  }
424
425  @Override
426  public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
427    // ignore timeout for now
428    waitOnMaster(getMasterIndex(serverName));
429  }
430
431  /**
432   * Starts a region server thread running
433   * @return New RegionServerThread
434   */
435  public JVMClusterUtil.RegionServerThread startRegionServer() throws IOException {
436    final Configuration newConf = HBaseConfiguration.create(conf);
437    return startRegionServer(newConf);
438  }
439
440  private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration)
441    throws IOException {
442    User rsUser = HBaseTestingUtil.getDifferentUser(configuration, ".hfs." + index++);
443    JVMClusterUtil.RegionServerThread t = null;
444    try {
445      t =
446        hbaseCluster.addRegionServer(configuration, hbaseCluster.getRegionServers().size(), rsUser);
447      t.start();
448      t.waitForServerOnline();
449    } catch (InterruptedException ie) {
450      throw new IOException("Interrupted adding regionserver to cluster", ie);
451    }
452    return t;
453  }
454
455  /**
456   * Starts a region server thread and waits until its processed by master. Throws an exception when
457   * it can't start a region server or when the region server is not processed by master within the
458   * timeout.
459   * @return New RegionServerThread
460   */
461  public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout)
462    throws IOException {
463
464    JVMClusterUtil.RegionServerThread t = startRegionServer();
465    ServerName rsServerName = t.getRegionServer().getServerName();
466
467    long start = EnvironmentEdgeManager.currentTime();
468    ClusterMetrics clusterStatus = getClusterMetrics();
469    while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
470      if (clusterStatus != null && clusterStatus.getLiveServerMetrics().containsKey(rsServerName)) {
471        return t;
472      }
473      Threads.sleep(100);
474    }
475    if (t.getRegionServer().isOnline()) {
476      throw new IOException("RS: " + rsServerName + " online, but not processed by master");
477    } else {
478      throw new IOException("RS: " + rsServerName + " is offline");
479    }
480  }
481
482  /**
483   * Cause a region server to exit doing basic clean up only on its way out.
484   * @param serverNumber Used as index into a list.
485   */
486  public String abortRegionServer(int serverNumber) {
487    HRegionServer server = getRegionServer(serverNumber);
488    LOG.info("Aborting " + server.toString());
489    server.abort("Aborting for tests", new Exception("Trace info"));
490    return server.toString();
491  }
492
493  /**
494   * Shut down the specified region server cleanly
495   * @param serverNumber Used as index into a list.
496   * @return the region server that was stopped
497   */
498  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
499    return stopRegionServer(serverNumber, true);
500  }
501
502  /**
503   * Shut down the specified region server cleanly
504   * @param serverNumber Used as index into a list.
505   * @param shutdownFS   True is we are to shutdown the filesystem as part of this regionserver's
506   *                     shutdown. Usually we do but you do not want to do this if you are running
507   *                     multiple regionservers in a test and you shut down one before end of the
508   *                     test.
509   * @return the region server that was stopped
510   */
511  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
512    final boolean shutdownFS) {
513    JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
514    LOG.info("Stopping " + server.toString());
515    server.getRegionServer().stop("Stopping rs " + serverNumber);
516    return server;
517  }
518
519  /**
520   * Suspend the specified region server
521   * @param serverNumber Used as index into a list.
522   */
523  public JVMClusterUtil.RegionServerThread suspendRegionServer(int serverNumber) {
524    JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
525    LOG.info("Suspending {}", server.toString());
526    server.suspend();
527    return server;
528  }
529
530  /**
531   * Resume the specified region server
532   * @param serverNumber Used as index into a list.
533   */
534  public JVMClusterUtil.RegionServerThread resumeRegionServer(int serverNumber) {
535    JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
536    LOG.info("Resuming {}", server.toString());
537    server.resume();
538    return server;
539  }
540
541  /**
542   * Wait for the specified region server to stop. Removes this thread from list of running threads.
543   * @return Name of region server that just went down.
544   */
545  public String waitOnRegionServer(final int serverNumber) {
546    return this.hbaseCluster.waitOnRegionServer(serverNumber);
547  }
548
549  /**
550   * Starts a master thread running
551   * @return New RegionServerThread
552   */
553  public JVMClusterUtil.MasterThread startMaster() throws IOException {
554    Configuration c = HBaseConfiguration.create(conf);
555    User user = HBaseTestingUtil.getDifferentUser(c, ".hfs." + index++);
556
557    JVMClusterUtil.MasterThread t = null;
558    try {
559      t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
560      t.start();
561    } catch (InterruptedException ie) {
562      throw new IOException("Interrupted adding master to cluster", ie);
563    }
564    conf.set(HConstants.MASTER_ADDRS_KEY,
565      hbaseCluster.getConfiguration().get(HConstants.MASTER_ADDRS_KEY));
566    return t;
567  }
568
569  /**
570   * Returns the current active master, if available.
571   * @return the active HMaster, null if none is active.
572   */
573  public HMaster getMaster() {
574    return this.hbaseCluster.getActiveMaster();
575  }
576
577  /**
578   * Returns the current active master thread, if available.
579   * @return the active MasterThread, null if none is active.
580   */
581  public MasterThread getMasterThread() {
582    for (MasterThread mt : hbaseCluster.getLiveMasters()) {
583      if (mt.getMaster().isActiveMaster()) {
584        return mt;
585      }
586    }
587    return null;
588  }
589
590  /**
591   * Returns the master at the specified index, if available.
592   * @return the active HMaster, null if none is active.
593   */
594  public HMaster getMaster(final int serverNumber) {
595    return this.hbaseCluster.getMaster(serverNumber);
596  }
597
598  /**
599   * Cause a master to exit without shutting down entire cluster.
600   * @param serverNumber Used as index into a list.
601   */
602  public String abortMaster(int serverNumber) {
603    HMaster server = getMaster(serverNumber);
604    LOG.info("Aborting " + server.toString());
605    server.abort("Aborting for tests", new Exception("Trace info"));
606    return server.toString();
607  }
608
609  /**
610   * Shut down the specified master cleanly
611   * @param serverNumber Used as index into a list.
612   * @return the region server that was stopped
613   */
614  public JVMClusterUtil.MasterThread stopMaster(int serverNumber) {
615    return stopMaster(serverNumber, true);
616  }
617
618  /**
619   * Shut down the specified master cleanly
620   * @param serverNumber Used as index into a list.
621   * @param shutdownFS   True is we are to shutdown the filesystem as part of this master's
622   *                     shutdown. Usually we do but you do not want to do this if you are running
623   *                     multiple master in a test and you shut down one before end of the test.
624   * @return the master that was stopped
625   */
626  public JVMClusterUtil.MasterThread stopMaster(int serverNumber, final boolean shutdownFS) {
627    JVMClusterUtil.MasterThread server = hbaseCluster.getMasters().get(serverNumber);
628    LOG.info("Stopping " + server.toString());
629    server.getMaster().stop("Stopping master " + serverNumber);
630    return server;
631  }
632
633  /**
634   * Wait for the specified master to stop. Removes this thread from list of running threads.
635   * @return Name of master that just went down.
636   */
637  public String waitOnMaster(final int serverNumber) {
638    return this.hbaseCluster.waitOnMaster(serverNumber);
639  }
640
641  /**
642   * Blocks until there is an active master and that master has completed initialization.
643   * @return true if an active master becomes available. false if there are no masters left.
644   */
645  @Override
646  public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
647    long start = EnvironmentEdgeManager.currentTime();
648    while (EnvironmentEdgeManager.currentTime() - start < timeout) {
649      for (JVMClusterUtil.MasterThread mt : getMasterThreads()) {
650        if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
651          return true;
652        }
653      }
654      Threads.sleep(100);
655    }
656    return false;
657  }
658
659  /**
660   * Returns list of master threads.
661   */
662  public List<JVMClusterUtil.MasterThread> getMasterThreads() {
663    return this.hbaseCluster.getMasters();
664  }
665
666  /**
667   * Returns list of live master threads (skips the aborted and the killed)
668   */
669  public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() {
670    return this.hbaseCluster.getLiveMasters();
671  }
672
673  /**
674   * Wait for Mini HBase Cluster to shut down.
675   */
676  public void join() {
677    this.hbaseCluster.join();
678  }
679
680  /**
681   * Shut down the mini HBase cluster
682   */
683  @Override
684  public void shutdown() throws IOException {
685    if (this.hbaseCluster != null) {
686      this.hbaseCluster.shutdown();
687    }
688  }
689
690  @Override
691  public void close() throws IOException {
692  }
693
694  @Override
695  public ClusterMetrics getClusterMetrics() throws IOException {
696    HMaster master = getMaster();
697    return master == null ? null : master.getClusterMetrics();
698  }
699
700  private void executeFlush(HRegion region) throws IOException {
701    if (!RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
702      return;
703    }
704    // retry 5 times if we can not flush
705    for (int i = 0; i < 5; i++) {
706      FlushResult result = region.flush(true);
707      if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) {
708        return;
709      }
710      Threads.sleep(1000);
711    }
712  }
713
714  /**
715   * Call flushCache on all regions on all participating regionservers.
716   */
717  public void flushcache() throws IOException {
718    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
719      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
720        executeFlush(r);
721      }
722    }
723  }
724
725  /**
726   * Call flushCache on all regions of the specified table.
727   */
728  public void flushcache(TableName tableName) throws IOException {
729    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
730      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
731        if (r.getTableDescriptor().getTableName().equals(tableName)) {
732          executeFlush(r);
733        }
734      }
735    }
736  }
737
738  /**
739   * Call flushCache on all regions on all participating regionservers.
740   */
741  public void compact(boolean major) throws IOException {
742    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
743      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
744        if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
745          r.compact(major);
746        }
747      }
748    }
749  }
750
751  /**
752   * Call flushCache on all regions of the specified table.
753   */
754  public void compact(TableName tableName, boolean major) throws IOException {
755    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
756      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
757        if (r.getTableDescriptor().getTableName().equals(tableName)) {
758          if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
759            r.compact(major);
760          }
761        }
762      }
763    }
764  }
765
766  /**
767   * Returns number of live region servers in the cluster currently.
768   */
769  public int getNumLiveRegionServers() {
770    return this.hbaseCluster.getLiveRegionServers().size();
771  }
772
773  /**
774   * Returns list of region server threads. Does not return the master even though it is also a
775   * region server.
776   */
777  public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
778    return this.hbaseCluster.getRegionServers();
779  }
780
781  /**
782   * Returns List of live region server threads (skips the aborted and the killed)
783   */
784  public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
785    return this.hbaseCluster.getLiveRegionServers();
786  }
787
788  /**
789   * Grab a numbered region server of your choice.
790   * @return region server
791   */
792  public HRegionServer getRegionServer(int serverNumber) {
793    return hbaseCluster.getRegionServer(serverNumber);
794  }
795
796  public HRegionServer getRegionServer(ServerName serverName) {
797    return hbaseCluster.getRegionServers().stream().map(t -> t.getRegionServer())
798      .filter(r -> r.getServerName().equals(serverName)).findFirst().orElse(null);
799  }
800
801  public List<HRegion> getRegions(byte[] tableName) {
802    return getRegions(TableName.valueOf(tableName));
803  }
804
805  public List<HRegion> getRegions(TableName tableName) {
806    List<HRegion> ret = new ArrayList<>();
807    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
808      HRegionServer hrs = rst.getRegionServer();
809      for (Region region : hrs.getOnlineRegionsLocalContext()) {
810        if (region.getTableDescriptor().getTableName().equals(tableName)) {
811          ret.add((HRegion) region);
812        }
813      }
814    }
815    return ret;
816  }
817
818  /**
819   * Returns index into List of {@link SingleProcessHBaseCluster#getRegionServerThreads()} of HRS
820   * carrying regionName. Returns -1 if none found.
821   */
822  public int getServerWithMeta() {
823    return getServerWith(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName());
824  }
825
826  /**
827   * Get the location of the specified region
828   * @param regionName Name of the region in bytes
829   * @return Index into List of {@link SingleProcessHBaseCluster#getRegionServerThreads()} of HRS
830   *         carrying hbase:meta. Returns -1 if none found.
831   */
832  public int getServerWith(byte[] regionName) {
833    int index = 0;
834    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
835      HRegionServer hrs = rst.getRegionServer();
836      if (!hrs.isStopped()) {
837        Region region = hrs.getOnlineRegion(regionName);
838        if (region != null) {
839          return index;
840        }
841      }
842      index++;
843    }
844    return -1;
845  }
846
847  @Override
848  public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
849    throws IOException {
850    int index = getServerWith(regionName);
851    if (index < 0) {
852      return null;
853    }
854    return getRegionServer(index).getServerName();
855  }
856
857  /**
858   * Counts the total numbers of regions being served by the currently online region servers by
859   * asking each how many regions they have. Does not look at hbase:meta at all. Count includes
860   * catalog tables.
861   * @return number of regions being served by all region servers
862   */
863  public long countServedRegions() {
864    long count = 0;
865    for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
866      count += rst.getRegionServer().getNumberOfOnlineRegions();
867    }
868    return count;
869  }
870
871  /**
872   * Do a simulated kill all masters and regionservers. Useful when it is impossible to bring the
873   * mini-cluster back for clean shutdown.
874   */
875  public void killAll() {
876    // Do backups first.
877    MasterThread activeMaster = null;
878    for (MasterThread masterThread : getMasterThreads()) {
879      if (!masterThread.getMaster().isActiveMaster()) {
880        masterThread.getMaster().abort("killAll");
881      } else {
882        activeMaster = masterThread;
883      }
884    }
885    // Do active after.
886    if (activeMaster != null) {
887      activeMaster.getMaster().abort("killAll");
888    }
889    for (RegionServerThread rst : getRegionServerThreads()) {
890      rst.getRegionServer().abort("killAll");
891    }
892  }
893
894  @Override
895  public void waitUntilShutDown() {
896    this.hbaseCluster.join();
897  }
898
899  public List<HRegion> findRegionsForTable(TableName tableName) {
900    ArrayList<HRegion> ret = new ArrayList<>();
901    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
902      HRegionServer hrs = rst.getRegionServer();
903      for (Region region : hrs.getRegions(tableName)) {
904        if (region.getTableDescriptor().getTableName().equals(tableName)) {
905          ret.add((HRegion) region);
906        }
907      }
908    }
909    return ret;
910  }
911
912  protected int getRegionServerIndex(ServerName serverName) {
913    // we have a small number of region servers, this should be fine for now.
914    List<RegionServerThread> servers = getRegionServerThreads();
915    for (int i = 0; i < servers.size(); i++) {
916      if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
917        return i;
918      }
919    }
920    return -1;
921  }
922
923  protected int getMasterIndex(ServerName serverName) {
924    List<MasterThread> masters = getMasterThreads();
925    for (int i = 0; i < masters.size(); i++) {
926      if (masters.get(i).getMaster().getServerName().equals(serverName)) {
927        return i;
928      }
929    }
930    return -1;
931  }
932}