001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
021import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
022
023import java.io.IOException;
024import java.util.List;
025import java.util.Random;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.ThreadLocalRandom;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicReference;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.HRegionInfo;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.TestMetaTableAccessor;
039import org.apache.hadoop.hbase.client.Consistency;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.RegionLocator;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.io.hfile.HFileScanner;
046import org.apache.hadoop.hbase.testclassification.LargeTests;
047import org.apache.hadoop.hbase.testclassification.RegionServerTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
051import org.apache.hadoop.hbase.util.Threads;
052import org.apache.hadoop.hdfs.DFSConfigKeys;
053import org.apache.hadoop.util.StringUtils;
054import org.junit.AfterClass;
055import org.junit.Assert;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
064
065import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
066import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
068
069/**
070 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole cluster.
071 * See {@link TestRegionServerNoMaster}.
072 */
073@Category({ RegionServerTests.class, LargeTests.class })
074public class TestRegionReplicas {
075
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078    HBaseClassTestRule.forClass(TestRegionReplicas.class);
079
080  private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicas.class);
081
082  private static final int NB_SERVERS = 1;
083  private static Table table;
084  private static final byte[] row = Bytes.toBytes("TestRegionReplicas");
085
086  private static HRegionInfo hriPrimary;
087  private static HRegionInfo hriSecondary;
088
089  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
090  private static final byte[] f = HConstants.CATALOG_FAMILY;
091
092  @BeforeClass
093  public static void before() throws Exception {
094    // Reduce the hdfs block size and prefetch to trigger the file-link reopen
095    // when the file is moved to archive (e.g. compaction)
096    HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192);
097    HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1);
098    HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
099
100    HTU.startMiniCluster(NB_SERVERS);
101    final TableName tableName = TableName.valueOf(TestRegionReplicas.class.getSimpleName());
102
103    // Create table then get the single region for our new table.
104    table = HTU.createTable(tableName, f);
105
106    try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) {
107      hriPrimary = locator.getRegionLocation(row, false).getRegionInfo();
108    }
109
110    // mock a secondary region info to open
111    hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
112      hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
113
114    // No master
115    TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
116  }
117
118  @AfterClass
119  public static void afterClass() throws Exception {
120    HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
121    table.close();
122    HTU.shutdownMiniCluster();
123  }
124
125  private HRegionServer getRS() {
126    return HTU.getMiniHBaseCluster().getRegionServer(0);
127  }
128
129  @Test
130  public void testOpenRegionReplica() throws Exception {
131    openRegion(HTU, getRS(), hriSecondary);
132    try {
133      // load some data to primary
134      HTU.loadNumericRows(table, f, 0, 1000);
135
136      // assert that we can read back from primary
137      Assert.assertEquals(1000, HTU.countRows(table));
138    } finally {
139      HTU.deleteNumericRows(table, f, 0, 1000);
140      closeRegion(HTU, getRS(), hriSecondary);
141    }
142  }
143
144  /** Tests that the meta location is saved for secondary regions */
145  @Test
146  public void testRegionReplicaUpdatesMetaLocation() throws Exception {
147    openRegion(HTU, getRS(), hriSecondary);
148    Table meta = null;
149    try {
150      meta = HTU.getConnection().getTable(TableName.META_TABLE_NAME);
151      TestMetaTableAccessor.assertMetaLocation(meta, hriPrimary.getRegionName(),
152        getRS().getServerName(), -1, 1, false);
153    } finally {
154      if (meta != null) {
155        meta.close();
156      }
157      closeRegion(HTU, getRS(), hriSecondary);
158    }
159  }
160
161  @Test
162  public void testRegionReplicaGets() throws Exception {
163    try {
164      // load some data to primary
165      HTU.loadNumericRows(table, f, 0, 1000);
166      // assert that we can read back from primary
167      Assert.assertEquals(1000, HTU.countRows(table));
168      // flush so that region replica can read
169      HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
170      region.flush(true);
171
172      openRegion(HTU, getRS(), hriSecondary);
173
174      // first try directly against region
175      region = getRS().getRegion(hriSecondary.getEncodedName());
176      assertGet(region, 42, true);
177
178      assertGetRpc(hriSecondary, 42, true);
179    } finally {
180      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
181      closeRegion(HTU, getRS(), hriSecondary);
182    }
183  }
184
185  @Test
186  public void testGetOnTargetRegionReplica() throws Exception {
187    try {
188      // load some data to primary
189      HTU.loadNumericRows(table, f, 0, 1000);
190      // assert that we can read back from primary
191      Assert.assertEquals(1000, HTU.countRows(table));
192      // flush so that region replica can read
193      HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
194      region.flush(true);
195
196      openRegion(HTU, getRS(), hriSecondary);
197
198      // try directly Get against region replica
199      byte[] row = Bytes.toBytes(String.valueOf(42));
200      Get get = new Get(row);
201      get.setConsistency(Consistency.TIMELINE);
202      get.setReplicaId(1);
203      Result result = table.get(get);
204      Assert.assertArrayEquals(row, result.getValue(f, null));
205    } finally {
206      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
207      closeRegion(HTU, getRS(), hriSecondary);
208    }
209  }
210
211  private void assertGet(Region region, int value, boolean expect) throws IOException {
212    byte[] row = Bytes.toBytes(String.valueOf(value));
213    Get get = new Get(row);
214    Result result = region.get(get);
215    if (expect) {
216      Assert.assertArrayEquals(row, result.getValue(f, null));
217    } else {
218      result.isEmpty();
219    }
220  }
221
222  // build a mock rpc
223  private void assertGetRpc(HRegionInfo info, int value, boolean expect)
224    throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
225    byte[] row = Bytes.toBytes(String.valueOf(value));
226    Get get = new Get(row);
227    ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get);
228    ClientProtos.GetResponse getResp = getRS().getRSRpcServices().get(null, getReq);
229    Result result = ProtobufUtil.toResult(getResp.getResult());
230    if (expect) {
231      Assert.assertArrayEquals(row, result.getValue(f, null));
232    } else {
233      result.isEmpty();
234    }
235  }
236
237  private void restartRegionServer() throws Exception {
238    afterClass();
239    before();
240  }
241
242  @Test
243  public void testRefresStoreFiles() throws Exception {
244    // enable store file refreshing
245    final int refreshPeriod = 2000; // 2 sec
246    HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100);
247    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
248      refreshPeriod);
249    // restart the region server so that it starts the refresher chore
250    restartRegionServer();
251
252    try {
253      LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
254      openRegion(HTU, getRS(), hriSecondary);
255
256      // load some data to primary
257      LOG.info("Loading data to primary region");
258      HTU.loadNumericRows(table, f, 0, 1000);
259      // assert that we can read back from primary
260      Assert.assertEquals(1000, HTU.countRows(table));
261      // flush so that region replica can read
262      LOG.info("Flushing primary region");
263      HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
264      region.flush(true);
265      HRegion primaryRegion = region;
266
267      // ensure that chore is run
268      LOG.info("Sleeping for " + (4 * refreshPeriod));
269      Threads.sleep(4 * refreshPeriod);
270
271      LOG.info("Checking results from secondary region replica");
272      Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName());
273      Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount());
274
275      assertGet(secondaryRegion, 42, true);
276      assertGetRpc(hriSecondary, 42, true);
277      assertGetRpc(hriSecondary, 1042, false);
278
279      // load some data to primary
280      HTU.loadNumericRows(table, f, 1000, 1100);
281      region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
282      region.flush(true);
283
284      HTU.loadNumericRows(table, f, 2000, 2100);
285      region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
286      region.flush(true);
287
288      // ensure that chore is run
289      Threads.sleep(4 * refreshPeriod);
290
291      assertGetRpc(hriSecondary, 42, true);
292      assertGetRpc(hriSecondary, 1042, true);
293      assertGetRpc(hriSecondary, 2042, true);
294
295      // ensure that we see the 3 store files
296      Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
297
298      // force compaction
299      HTU.compact(table.getName(), true);
300
301      long wakeUpTime = EnvironmentEdgeManager.currentTime() + 4 * refreshPeriod;
302      while (EnvironmentEdgeManager.currentTime() < wakeUpTime) {
303        assertGetRpc(hriSecondary, 42, true);
304        assertGetRpc(hriSecondary, 1042, true);
305        assertGetRpc(hriSecondary, 2042, true);
306        Threads.sleep(10);
307      }
308
309      // ensure that we see the compacted file only
310      // This will be 4 until the cleaner chore runs
311      Assert.assertEquals(4, secondaryRegion.getStore(f).getStorefilesCount());
312
313    } finally {
314      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
315      closeRegion(HTU, getRS(), hriSecondary);
316    }
317  }
318
319  @Test
320  public void testFlushAndCompactionsInPrimary() throws Exception {
321
322    long runtime = 30 * 1000;
323    // enable store file refreshing
324    final int refreshPeriod = 100; // 100ms refresh is a lot
325    HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3);
326    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
327      refreshPeriod);
328    // restart the region server so that it starts the refresher chore
329    restartRegionServer();
330    final int startKey = 0, endKey = 1000;
331
332    try {
333      openRegion(HTU, getRS(), hriSecondary);
334
335      // load some data to primary so that reader won't fail
336      HTU.loadNumericRows(table, f, startKey, endKey);
337      TestRegionServerNoMaster.flushRegion(HTU, hriPrimary);
338      // ensure that chore is run
339      Threads.sleep(2 * refreshPeriod);
340
341      final AtomicBoolean running = new AtomicBoolean(true);
342      @SuppressWarnings("unchecked")
343      final AtomicReference<Exception>[] exceptions = new AtomicReference[3];
344      for (int i = 0; i < exceptions.length; i++) {
345        exceptions[i] = new AtomicReference<>();
346      }
347
348      Runnable writer = new Runnable() {
349        int key = startKey;
350
351        @Override
352        public void run() {
353          try {
354            while (running.get()) {
355              byte[] data = Bytes.toBytes(String.valueOf(key));
356              Put put = new Put(data);
357              put.addColumn(f, null, data);
358              table.put(put);
359              key++;
360              if (key == endKey) {
361                key = startKey;
362              }
363            }
364          } catch (Exception ex) {
365            LOG.warn(ex.toString(), ex);
366            exceptions[0].compareAndSet(null, ex);
367          }
368        }
369      };
370
371      Runnable flusherCompactor = new Runnable() {
372        Random random = ThreadLocalRandom.current();
373
374        public void run() {
375          try {
376            while (running.get()) {
377              // flush or compact
378              if (random.nextBoolean()) {
379                TestRegionServerNoMaster.flushRegion(HTU, hriPrimary);
380              } else {
381                HTU.compact(table.getName(), random.nextBoolean());
382              }
383            }
384          } catch (Exception ex) {
385            LOG.warn(ex.toString(), ex);
386            exceptions[1].compareAndSet(null, ex);
387          }
388        }
389      };
390
391      Runnable reader = new Runnable() {
392        @Override
393        public void run() {
394          try {
395            Random random = ThreadLocalRandom.current();
396            while (running.get()) {
397              // whether to do a close and open
398              if (random.nextInt(10) == 0) {
399                try {
400                  closeRegion(HTU, getRS(), hriSecondary);
401                } catch (Exception ex) {
402                  LOG.warn("Failed closing the region " + hriSecondary + " "
403                    + StringUtils.stringifyException(ex));
404                  exceptions[2].compareAndSet(null, ex);
405                }
406                try {
407                  openRegion(HTU, getRS(), hriSecondary);
408                } catch (Exception ex) {
409                  LOG.warn("Failed opening the region " + hriSecondary + " "
410                    + StringUtils.stringifyException(ex));
411                  exceptions[2].compareAndSet(null, ex);
412                }
413              }
414
415              int key = random.nextInt(endKey - startKey) + startKey;
416              assertGetRpc(hriSecondary, key, true);
417            }
418          } catch (Exception ex) {
419            LOG.warn("Failed getting the value in the region " + hriSecondary + " "
420              + StringUtils.stringifyException(ex));
421            exceptions[2].compareAndSet(null, ex);
422          }
423        }
424      };
425
426      LOG.info("Starting writer and reader, secondary={}", hriSecondary.getEncodedName());
427      ExecutorService executor = Executors.newFixedThreadPool(3);
428      executor.submit(writer);
429      executor.submit(flusherCompactor);
430      executor.submit(reader);
431
432      // wait for threads
433      Threads.sleep(runtime);
434      running.set(false);
435      executor.shutdown();
436      executor.awaitTermination(30, TimeUnit.SECONDS);
437
438      for (AtomicReference<Exception> exRef : exceptions) {
439        Assert.assertNull(exRef.get());
440      }
441    } finally {
442      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey);
443      try {
444        closeRegion(HTU, getRS(), hriSecondary);
445      } catch (ServiceException e) {
446        LOG.info("Closing wrong region {}", hriSecondary, e);
447      }
448    }
449  }
450
451  @Test
452  public void testVerifySecondaryAbilityToReadWithOnFiles() throws Exception {
453    // disable the store file refresh chore (we do this by hand)
454    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0);
455    restartRegionServer();
456
457    try {
458      LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
459      openRegion(HTU, getRS(), hriSecondary);
460
461      // load some data to primary
462      LOG.info("Loading data to primary region");
463      for (int i = 0; i < 3; ++i) {
464        HTU.loadNumericRows(table, f, i * 1000, (i + 1) * 1000);
465        HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
466        region.flush(true);
467      }
468
469      HRegion primaryRegion = getRS().getRegion(hriPrimary.getEncodedName());
470      Assert.assertEquals(3, primaryRegion.getStore(f).getStorefilesCount());
471
472      // Refresh store files on the secondary
473      Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName());
474      secondaryRegion.getStore(f).refreshStoreFiles();
475      Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
476
477      // force compaction
478      LOG.info("Force Major compaction on primary region " + hriPrimary);
479      primaryRegion.compact(true);
480      Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount());
481      List<RegionServerThread> regionServerThreads =
482        HTU.getMiniHBaseCluster().getRegionServerThreads();
483      HRegionServer hrs = null;
484      for (RegionServerThread rs : regionServerThreads) {
485        if (
486          rs.getRegionServer().getOnlineRegion(primaryRegion.getRegionInfo().getRegionName())
487              != null
488        ) {
489          hrs = rs.getRegionServer();
490          break;
491        }
492      }
493      CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, hrs, false);
494      cleaner.chore();
495      // scan all the hfiles on the secondary.
496      // since there are no read on the secondary when we ask locations to
497      // the NN a FileNotFound exception will be returned and the FileLink
498      // should be able to deal with it giving us all the result we expect.
499      int keys = 0;
500      int sum = 0;
501      for (HStoreFile sf : ((HStore) secondaryRegion.getStore(f)).getStorefiles()) {
502        // Our file does not exist anymore. was moved by the compaction above.
503        LOG.debug(Boolean.toString(getRS().getFileSystem().exists(sf.getPath())));
504        Assert.assertFalse(getRS().getFileSystem().exists(sf.getPath()));
505
506        HFileScanner scanner = sf.getReader().getScanner(false, false);
507        scanner.seekTo();
508        do {
509          keys++;
510
511          Cell cell = scanner.getCell();
512          sum += Integer
513            .parseInt(Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
514        } while (scanner.next());
515      }
516      Assert.assertEquals(3000, keys);
517      Assert.assertEquals(4498500, sum);
518    } finally {
519      HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
520      closeRegion(HTU, getRS(), hriSecondary);
521    }
522  }
523}