001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.List;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellScanner;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.ClientMetaTableAccessor;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.MetaTableAccessor;
039import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.Waiter;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.client.Get;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.client.RegionLocator;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.regionserver.HRegion;
051import org.apache.hadoop.hbase.regionserver.HRegionServer;
052import org.apache.hadoop.hbase.regionserver.Region;
053import org.apache.hadoop.hbase.regionserver.RegionScanner;
054import org.apache.hadoop.hbase.testclassification.LargeTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
057import org.junit.After;
058import org.junit.Before;
059import org.junit.ClassRule;
060import org.junit.Rule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.junit.rules.TestName;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067/**
068 * Tests region replication for hbase:meta by setting up region replicas and verifying async wal
069 * replication replays the edits to the secondary region in various scenarios.
070 * @see TestRegionReplicaReplication
071 */
072@Category({ LargeTests.class })
073public class TestMetaRegionReplicaReplication {
074
075  @ClassRule
076  public static final HBaseClassTestRule CLASS_RULE =
077    HBaseClassTestRule.forClass(TestMetaRegionReplicaReplication.class);
078  private static final Logger LOG = LoggerFactory.getLogger(TestMetaRegionReplicaReplication.class);
079  private static final int NB_SERVERS = 4;
080  private final HBaseTestingUtil HTU = new HBaseTestingUtil();
081  private int numOfMetaReplica = NB_SERVERS - 1;
082  private static byte[] VALUE = Bytes.toBytes("value");
083
084  @Rule
085  public TestName name = new TestName();
086
087  @Before
088  public void before() throws Exception {
089    Configuration conf = HTU.getConfiguration();
090    conf.setInt("zookeeper.recovery.retry", 1);
091    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
092    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
093    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
094    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
095    // Enable hbase:meta replication.
096    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
097    // Set hbase:meta replicas to be 3.
098    // conf.setInt(HConstants.META_REPLICAS_NUM, numOfMetaReplica);
099    HTU.startMiniCluster(NB_SERVERS);
100    // Enable hbase:meta replication.
101    HBaseTestingUtil.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, numOfMetaReplica);
102
103    HTU.waitFor(30000, () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size()
104        >= numOfMetaReplica);
105  }
106
107  @After
108  public void after() throws Exception {
109    HTU.shutdownMiniCluster();
110  }
111
112  /**
113   * Test meta region replica replication. Create some tables and see if replicas pick up the
114   * additions.
115   */
116  @Test
117  public void testHBaseMetaReplicates() throws Exception {
118    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_0"),
119      HConstants.CATALOG_FAMILY,
120      Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
121      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
122    }
123    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_1"),
124      HConstants.CATALOG_FAMILY,
125      Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
126      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
127      // Try delete.
128      HTU.deleteTableIfAny(table.getName());
129      verifyDeletedReplication(TableName.META_TABLE_NAME, numOfMetaReplica, table.getName());
130    }
131  }
132
133  @Test
134  public void testCatalogReplicaReplicationWithFlushAndCompaction() throws Exception {
135    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
136      Table table = connection.getTable(TableName.META_TABLE_NAME)) {
137      // load the data to the table
138      for (int i = 0; i < 5; i++) {
139        LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
140        HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
141        LOG.info("flushing table");
142        HTU.flush(TableName.META_TABLE_NAME);
143        LOG.info("compacting table");
144        if (i < 4) {
145          HTU.compact(TableName.META_TABLE_NAME, false);
146        }
147      }
148
149      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 0, 5000,
150        HConstants.CATALOG_FAMILY);
151    }
152  }
153
154  @Test
155  public void testCatalogReplicaReplicationWithReplicaMoved() throws Exception {
156    SingleProcessHBaseCluster cluster = HTU.getMiniHBaseCluster();
157    HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
158
159    HRegionServer hrsNoMetaReplica = null;
160    HRegionServer server = null;
161    Region metaReplica = null;
162    boolean hostingMeta;
163
164    for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) {
165      server = cluster.getRegionServer(i);
166      hostingMeta = false;
167      if (server == hrs) {
168        continue;
169      }
170      for (Region region : server.getOnlineRegionsLocalContext()) {
171        if (region.getRegionInfo().isMetaRegion()) {
172          if (metaReplica == null) {
173            metaReplica = region;
174          }
175          hostingMeta = true;
176          break;
177        }
178      }
179      if (!hostingMeta) {
180        hrsNoMetaReplica = server;
181      }
182    }
183    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
184      Table table = connection.getTable(TableName.META_TABLE_NAME)) {
185      // load the data to the table
186      for (int i = 0; i < 5; i++) {
187        LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
188        HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
189        if (i == 0) {
190          HTU.moveRegionAndWait(metaReplica.getRegionInfo(), hrsNoMetaReplica.getServerName());
191        }
192      }
193
194      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 0, 5000,
195        HConstants.CATALOG_FAMILY);
196    }
197  }
198
199  protected void verifyReplication(TableName tableName, int regionReplication, final int startRow,
200    final int endRow, final byte[] family) throws Exception {
201    verifyReplication(tableName, regionReplication, startRow, endRow, family, true);
202  }
203
204  private void verifyReplication(TableName tableName, int regionReplication, final int startRow,
205    final int endRow, final byte[] family, final boolean present) throws Exception {
206    // find the regions
207    final Region[] regions = new Region[regionReplication];
208
209    for (int i = 0; i < NB_SERVERS; i++) {
210      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
211      List<HRegion> onlineRegions = rs.getRegions(tableName);
212      for (HRegion region : onlineRegions) {
213        regions[region.getRegionInfo().getReplicaId()] = region;
214      }
215    }
216
217    for (Region region : regions) {
218      assertNotNull(region);
219    }
220
221    for (int i = 1; i < regionReplication; i++) {
222      final Region region = regions[i];
223      // wait until all the data is replicated to all secondary regions
224      Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
225        @Override
226        public boolean evaluate() throws Exception {
227          LOG.info("verifying replication for region replica:" + region.getRegionInfo());
228          try {
229            HTU.verifyNumericRows(region, family, startRow, endRow, present);
230          } catch (Throwable ex) {
231            LOG.warn("Verification from secondary region is not complete yet", ex);
232            // still wait
233            return false;
234          }
235          return true;
236        }
237      });
238    }
239  }
240
241  /**
242   * Scan hbase:meta for <code>tableName</code> content.
243   */
244  private List<Result> getMetaCells(TableName tableName) throws IOException {
245    final List<Result> results = new ArrayList<>();
246    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
247      @Override
248      public boolean visit(Result r) throws IOException {
249        results.add(r);
250        return true;
251      }
252    };
253    MetaTableAccessor.scanMetaForTableRegions(HTU.getConnection(), visitor, tableName);
254    return results;
255  }
256
257  /** Returns All Regions for tableName including Replicas. */
258  private Region[] getAllRegions(TableName tableName, int replication) {
259    final Region[] regions = new Region[replication];
260    for (int i = 0; i < NB_SERVERS; i++) {
261      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
262      List<HRegion> onlineRegions = rs.getRegions(tableName);
263      for (HRegion region : onlineRegions) {
264        regions[region.getRegionInfo().getReplicaId()] = region;
265      }
266    }
267    for (Region region : regions) {
268      assertNotNull(region);
269    }
270    return regions;
271  }
272
273  /**
274   * Verify when a Table is deleted from primary, then there are no references in replicas (because
275   * they get the delete of the table rows too).
276   */
277  private void verifyDeletedReplication(TableName tableName, int regionReplication,
278    final TableName deletedTableName) {
279    final Region[] regions = getAllRegions(tableName, regionReplication);
280
281    // Start count at '1' so we skip default, primary replica and only look at secondaries.
282    for (int i = 1; i < regionReplication; i++) {
283      final Region region = regions[i];
284      // wait until all the data is replicated to all secondary regions
285      Waiter.waitFor(HTU.getConfiguration(), 30000, 1000, new Waiter.Predicate<Exception>() {
286        @Override
287        public boolean evaluate() throws Exception {
288          LOG.info("Verifying replication for region replica {}", region.getRegionInfo());
289          try (RegionScanner rs = region.getScanner(new Scan())) {
290            List<Cell> cells = new ArrayList<>();
291            while (rs.next(cells)) {
292              continue;
293            }
294            return doesNotContain(cells, deletedTableName);
295          } catch (Throwable ex) {
296            LOG.warn("Verification from secondary region is not complete yet", ex);
297            // still wait
298            return false;
299          }
300        }
301      });
302    }
303  }
304
305  /**
306   * Cells are from hbase:meta replica so will start w/ 'tableName,'; i.e. the tablename followed by
307   * HConstants.DELIMITER. Make sure the deleted table is no longer present in passed
308   * <code>cells</code>.
309   */
310  private boolean doesNotContain(List<Cell> cells, TableName tableName) {
311    for (Cell cell : cells) {
312      String row = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
313      if (row.startsWith(tableName.toString() + HConstants.DELIMITER)) {
314        return false;
315      }
316    }
317    return true;
318  }
319
320  /**
321   * Verify Replicas have results (exactly).
322   */
323  private void verifyReplication(TableName tableName, int regionReplication,
324    List<Result> contains) {
325    final Region[] regions = getAllRegions(tableName, regionReplication);
326
327    // Start count at '1' so we skip default, primary replica and only look at secondaries.
328    for (int i = 1; i < regionReplication; i++) {
329      final Region region = regions[i];
330      // wait until all the data is replicated to all secondary regions
331      Waiter.waitFor(HTU.getConfiguration(), 30000, 1000, new Waiter.Predicate<Exception>() {
332        @Override
333        public boolean evaluate() throws Exception {
334          LOG.info("Verifying replication for region replica {}", region.getRegionInfo());
335          try (RegionScanner rs = region.getScanner(new Scan())) {
336            List<Cell> cells = new ArrayList<>();
337            while (rs.next(cells)) {
338              continue;
339            }
340            return contains(contains, cells);
341          } catch (Throwable ex) {
342            LOG.warn("Verification from secondary region is not complete yet", ex);
343            // still wait
344            return false;
345          }
346        }
347      });
348    }
349  }
350
351  /**
352   * Presumes sorted Cells. Verify that <code>cells</code> has <code>contains</code> at least.
353   */
354  static boolean contains(List<Result> contains, List<Cell> cells) throws IOException {
355    CellScanner containsScanner = CellUtil.createCellScanner(contains);
356    CellScanner cellsScanner = CellUtil.createCellScanner(cells);
357    int matches = 0;
358    int count = 0;
359    while (containsScanner.advance()) {
360      while (cellsScanner.advance()) {
361        count++;
362        LOG.info("{} {}", containsScanner.current(), cellsScanner.current());
363        if (containsScanner.current().equals(cellsScanner.current())) {
364          matches++;
365          break;
366        }
367      }
368    }
369    return !containsScanner.advance() && matches >= 1 && count >= matches && count == cells.size();
370  }
371
372  private void doNGets(final Table table, final byte[][] keys) throws Exception {
373    for (byte[] key : keys) {
374      Result r = table.get(new Get(key));
375      assertArrayEquals(VALUE, r.getValue(HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY));
376    }
377  }
378
379  private void primaryNoChangeReplicaIncrease(final long[] before, final long[] after) {
380    assertEquals(before[RegionInfo.DEFAULT_REPLICA_ID], after[RegionInfo.DEFAULT_REPLICA_ID]);
381
382    for (int i = 1; i < after.length; i++) {
383      assertTrue(after[i] > before[i]);
384    }
385  }
386
387  private void primaryIncreaseReplicaNoChange(final long[] before, final long[] after) {
388    // There are read requests increase for primary meta replica.
389    assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] > before[RegionInfo.DEFAULT_REPLICA_ID]);
390
391    // No change for replica regions
392    for (int i = 1; i < after.length; i++) {
393      assertEquals(before[i], after[i]);
394    }
395  }
396
397  private void primaryIncreaseReplicaIncrease(final long[] before, final long[] after) {
398    // There are read requests increase for all meta replica regions,
399    for (int i = 0; i < after.length; i++) {
400      assertTrue(after[i] > before[i]);
401    }
402  }
403
404  private void getMetaReplicaReadRequests(final Region[] metaRegions, final long[] counters) {
405    int i = 0;
406    for (Region r : metaRegions) {
407      LOG.info("read request for region {} is {}", r, r.getReadRequestsCount());
408      counters[i] = r.getReadRequestsCount();
409      i++;
410    }
411  }
412
413  @Test
414  public void testHBaseMetaReplicaGets() throws Exception {
415    TableName tn = TableName.valueOf(this.name.getMethodName());
416    final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica);
417    long[] readReqsForMetaReplicas = new long[numOfMetaReplica];
418    long[] readReqsForMetaReplicasAfterGet = new long[numOfMetaReplica];
419    long[] readReqsForMetaReplicasAfterGetAllLocations = new long[numOfMetaReplica];
420    long[] readReqsForMetaReplicasAfterMove = new long[numOfMetaReplica];
421    long[] readReqsForMetaReplicasAfterSecondMove = new long[numOfMetaReplica];
422    long[] readReqsForMetaReplicasAfterThirdGet = new long[numOfMetaReplica];
423    Region userRegion = null;
424    HRegionServer srcRs = null;
425    HRegionServer destRs = null;
426
427    try (Table table = HTU.createTable(tn, HConstants.CATALOG_FAMILY,
428      Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
429      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
430      // load different values
431      HTU.loadTable(table, new byte[][] { HConstants.CATALOG_FAMILY }, VALUE);
432      for (int i = 0; i < NB_SERVERS; i++) {
433        HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
434        List<HRegion> onlineRegions = rs.getRegions(tn);
435        if (onlineRegions.size() > 0) {
436          userRegion = onlineRegions.get(0);
437          srcRs = rs;
438          if (i > 0) {
439            destRs = HTU.getMiniHBaseCluster().getRegionServer(0);
440          } else {
441            destRs = HTU.getMiniHBaseCluster().getRegionServer(1);
442          }
443        }
444      }
445
446      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicas);
447
448      Configuration c = new Configuration(HTU.getConfiguration());
449      c.setBoolean(HConstants.USE_META_REPLICAS, true);
450      c.set(LOCATOR_META_REPLICAS_MODE, "LoadBalance");
451      Connection connection = ConnectionFactory.createConnection(c);
452      Table tableForGet = connection.getTable(tn);
453      byte[][] getRows = new byte[HBaseTestingUtil.KEYS.length][];
454
455      int i = 0;
456      for (byte[] key : HBaseTestingUtil.KEYS) {
457        getRows[i] = key;
458        i++;
459      }
460      getRows[0] = Bytes.toBytes("aaa");
461      doNGets(tableForGet, getRows);
462
463      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterGet);
464
465      // There are more reads against all meta replica regions, including the primary region.
466      primaryIncreaseReplicaIncrease(readReqsForMetaReplicas, readReqsForMetaReplicasAfterGet);
467
468      RegionLocator locator = tableForGet.getRegionLocator();
469
470      for (int j = 0; j < numOfMetaReplica * 3; j++) {
471        locator.getAllRegionLocations();
472      }
473
474      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterGetAllLocations);
475      primaryIncreaseReplicaIncrease(readReqsForMetaReplicasAfterGet,
476        readReqsForMetaReplicasAfterGetAllLocations);
477
478      // move one of regions so it meta cache may be invalid.
479      HTU.moveRegionAndWait(userRegion.getRegionInfo(), destRs.getServerName());
480
481      doNGets(tableForGet, getRows);
482
483      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterMove);
484
485      // There are read requests increase for primary meta replica.
486      // For rest of meta replicas, there is no change as regionMove will tell the new location
487      primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterGetAllLocations,
488        readReqsForMetaReplicasAfterMove);
489      // Move region again.
490      HTU.moveRegionAndWait(userRegion.getRegionInfo(), srcRs.getServerName());
491
492      // Wait until moveRegion cache timeout.
493      while (destRs.getMovedRegion(userRegion.getRegionInfo().getEncodedName()) != null) {
494        Thread.sleep(1000);
495      }
496
497      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterSecondMove);
498
499      // There are read requests increase for primary meta replica.
500      // For rest of meta replicas, there is no change.
501      primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterMove,
502        readReqsForMetaReplicasAfterSecondMove);
503
504      doNGets(tableForGet, getRows);
505
506      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterThirdGet);
507
508      // Since it gets RegionNotServedException, it will go to primary for the next lookup.
509      // There are read requests increase for primary meta replica.
510      // For rest of meta replicas, there is no change.
511      primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterSecondMove,
512        readReqsForMetaReplicasAfterThirdGet);
513    }
514  }
515}