001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertFalse;
022import static org.mockito.ArgumentMatchers.any;
023import static org.mockito.ArgumentMatchers.anyList;
024import static org.mockito.ArgumentMatchers.anyString;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.never;
027import static org.mockito.Mockito.times;
028import static org.mockito.Mockito.verify;
029import static org.mockito.Mockito.when;
030
031import java.io.IOException;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.Stoppable;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Delete;
041import org.apache.hadoop.hbase.client.Get;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.client.RegionInfoBuilder;
045import org.apache.hadoop.hbase.client.Result;
046import org.apache.hadoop.hbase.client.ResultScanner;
047import org.apache.hadoop.hbase.client.Scan;
048import org.apache.hadoop.hbase.client.Table;
049import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
050import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
051import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat;
052import org.apache.hadoop.hbase.replication.ReplicationException;
053import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
054import org.apache.hadoop.hbase.testclassification.MasterTests;
055import org.apache.hadoop.hbase.testclassification.MediumTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.junit.After;
059import org.junit.AfterClass;
060import org.junit.BeforeClass;
061import org.junit.ClassRule;
062import org.junit.Rule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065import org.junit.rules.TestName;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
070
071@Category({ MasterTests.class, MediumTests.class })
072public class TestReplicationBarrierCleaner {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076    HBaseClassTestRule.forClass(TestReplicationBarrierCleaner.class);
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestHFileCleaner.class);
079
080  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
081
082  @Rule
083  public final TestName name = new TestName();
084
085  @BeforeClass
086  public static void setUpBeforeClass() throws Exception {
087    UTIL.startMiniCluster(1);
088  }
089
090  @AfterClass
091  public static void tearDownAfterClass() throws Exception {
092    UTIL.shutdownMiniCluster();
093  }
094
095  @After
096  public void tearDown() throws IOException {
097    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
098      ResultScanner scanner = table.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY)
099        .addFamily(HConstants.REPLICATION_BARRIER_FAMILY).setFilter(new FirstKeyOnlyFilter()))) {
100      for (;;) {
101        Result result = scanner.next();
102        if (result == null) {
103          break;
104        }
105        TableName tableName = RegionInfo.getTable(result.getRow());
106        if (!tableName.isSystemTable()) {
107          table.delete(new Delete(result.getRow()));
108        }
109      }
110    }
111  }
112
113  private ReplicationPeerManager create(ReplicationQueueStorage queueStorage,
114    List<String> firstPeerIds, @SuppressWarnings("unchecked") List<String>... peerIds) {
115    ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class);
116    if (queueStorage != null) {
117      when(peerManager.getQueueStorage()).thenReturn(queueStorage);
118    }
119    if (peerIds.length == 0) {
120      when(peerManager.getSerialPeerIdsBelongsTo(any(TableName.class))).thenReturn(firstPeerIds);
121    } else {
122      when(peerManager.getSerialPeerIdsBelongsTo(any(TableName.class))).thenReturn(firstPeerIds,
123        peerIds);
124    }
125    return peerManager;
126  }
127
128  private ReplicationQueueStorage create(Long lastPushedSeqId, Long... lastPushedSeqIds)
129    throws ReplicationException {
130    ReplicationQueueStorage queueStorage = mock(ReplicationQueueStorage.class);
131    if (lastPushedSeqIds.length == 0) {
132      when(queueStorage.getLastSequenceId(anyString(), anyString())).thenReturn(lastPushedSeqId);
133    } else {
134      when(queueStorage.getLastSequenceId(anyString(), anyString())).thenReturn(lastPushedSeqId,
135        lastPushedSeqIds);
136    }
137    return queueStorage;
138  }
139
140  private ReplicationBarrierCleaner create(ReplicationPeerManager peerManager) throws IOException {
141    return new ReplicationBarrierCleaner(UTIL.getConfiguration(), new WarnOnlyStoppable(),
142      UTIL.getConnection(), peerManager);
143  }
144
145  private void addBarrier(RegionInfo region, long... barriers) throws IOException {
146    Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
147    for (int i = 0; i < barriers.length; i++) {
148      put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
149        put.getTimestamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
150    }
151    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
152      table.put(put);
153    }
154  }
155
156  private void fillCatalogFamily(RegionInfo region) throws IOException {
157    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
158      table.put(new Put(region.getRegionName()).addColumn(HConstants.CATALOG_FAMILY,
159        Bytes.toBytes("whatever"), Bytes.toBytes("whatever")));
160    }
161  }
162
163  private void clearCatalogFamily(RegionInfo region) throws IOException {
164    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
165      table.delete(new Delete(region.getRegionName()).addFamily(HConstants.CATALOG_FAMILY));
166    }
167  }
168
169  @Test
170  public void testNothing() throws IOException {
171    ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class);
172    ReplicationBarrierCleaner cleaner = create(peerManager);
173    cleaner.chore();
174    verify(peerManager, never()).getSerialPeerIdsBelongsTo(any(TableName.class));
175    verify(peerManager, never()).getQueueStorage();
176  }
177
178  @Test
179  public void testCleanNoPeers() throws IOException {
180    TableName tableName1 = TableName.valueOf(name.getMethodName() + "_1");
181    RegionInfo region11 =
182      RegionInfoBuilder.newBuilder(tableName1).setEndKey(Bytes.toBytes(1)).build();
183    addBarrier(region11, 10, 20, 30, 40, 50, 60);
184    fillCatalogFamily(region11);
185    RegionInfo region12 =
186      RegionInfoBuilder.newBuilder(tableName1).setStartKey(Bytes.toBytes(1)).build();
187    addBarrier(region12, 20, 30, 40, 50, 60, 70);
188    fillCatalogFamily(region12);
189
190    TableName tableName2 = TableName.valueOf(name.getMethodName() + "_2");
191    RegionInfo region21 =
192      RegionInfoBuilder.newBuilder(tableName2).setEndKey(Bytes.toBytes(1)).build();
193    addBarrier(region21, 100, 200, 300, 400);
194    fillCatalogFamily(region21);
195    RegionInfo region22 =
196      RegionInfoBuilder.newBuilder(tableName2).setStartKey(Bytes.toBytes(1)).build();
197    addBarrier(region22, 200, 300, 400, 500, 600);
198    fillCatalogFamily(region22);
199
200    @SuppressWarnings("unchecked")
201    ReplicationPeerManager peerManager =
202      create(null, Collections.emptyList(), Collections.emptyList());
203    ReplicationBarrierCleaner cleaner = create(peerManager);
204    cleaner.chore();
205
206    // should never call this method
207    verify(peerManager, never()).getQueueStorage();
208    // should only be called twice although we have 4 regions to clean
209    verify(peerManager, times(2)).getSerialPeerIdsBelongsTo(any(TableName.class));
210
211    assertArrayEquals(new long[] { 60 }, ReplicationBarrierFamilyFormat
212      .getReplicationBarriers(UTIL.getConnection(), region11.getRegionName()));
213    assertArrayEquals(new long[] { 70 }, ReplicationBarrierFamilyFormat
214      .getReplicationBarriers(UTIL.getConnection(), region12.getRegionName()));
215
216    assertArrayEquals(new long[] { 400 }, ReplicationBarrierFamilyFormat
217      .getReplicationBarriers(UTIL.getConnection(), region21.getRegionName()));
218    assertArrayEquals(new long[] { 600 }, ReplicationBarrierFamilyFormat
219      .getReplicationBarriers(UTIL.getConnection(), region22.getRegionName()));
220  }
221
222  @Test
223  public void testDeleteBarriers() throws IOException, ReplicationException {
224    TableName tableName = TableName.valueOf(name.getMethodName());
225    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
226    addBarrier(region, 10, 20, 30, 40, 50, 60);
227    // two peers
228    ReplicationQueueStorage queueStorage = create(-1L, 2L, 15L, 25L, 20L, 25L, 65L, 55L, 70L, 70L);
229    List<String> peerIds = Lists.newArrayList("1", "2");
230
231    @SuppressWarnings("unchecked")
232    ReplicationPeerManager peerManager =
233      create(queueStorage, peerIds, peerIds, peerIds, peerIds, peerIds);
234    ReplicationBarrierCleaner cleaner = create(peerManager);
235
236    // beyond the first barrier, no deletion
237    cleaner.chore();
238    assertArrayEquals(new long[] { 10, 20, 30, 40, 50, 60 }, ReplicationBarrierFamilyFormat
239      .getReplicationBarriers(UTIL.getConnection(), region.getRegionName()));
240
241    // in the first range, still no deletion
242    cleaner.chore();
243    assertArrayEquals(new long[] { 10, 20, 30, 40, 50, 60 }, ReplicationBarrierFamilyFormat
244      .getReplicationBarriers(UTIL.getConnection(), region.getRegionName()));
245
246    // in the second range, 10 is deleted
247    cleaner.chore();
248    assertArrayEquals(new long[] { 20, 30, 40, 50, 60 }, ReplicationBarrierFamilyFormat
249      .getReplicationBarriers(UTIL.getConnection(), region.getRegionName()));
250
251    // between 50 and 60, so the barriers before 50 will be deleted
252    cleaner.chore();
253    assertArrayEquals(new long[] { 50, 60 }, ReplicationBarrierFamilyFormat
254      .getReplicationBarriers(UTIL.getConnection(), region.getRegionName()));
255
256    // in the last open range, 50 is deleted
257    cleaner.chore();
258    assertArrayEquals(new long[] { 60 }, ReplicationBarrierFamilyFormat
259      .getReplicationBarriers(UTIL.getConnection(), region.getRegionName()));
260  }
261
262  @Test
263  public void testDeleteRowForDeletedRegion() throws IOException, ReplicationException {
264    TableName tableName = TableName.valueOf(name.getMethodName());
265    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
266    addBarrier(region, 40, 50, 60);
267    fillCatalogFamily(region);
268
269    String peerId = "1";
270    ReplicationQueueStorage queueStorage = create(59L);
271    @SuppressWarnings("unchecked")
272    ReplicationPeerManager peerManager = create(queueStorage, Lists.newArrayList(peerId));
273    ReplicationBarrierCleaner cleaner = create(peerManager);
274
275    // we have something in catalog family, so only delete 40
276    cleaner.chore();
277    assertArrayEquals(new long[] { 50, 60 }, ReplicationBarrierFamilyFormat
278      .getReplicationBarriers(UTIL.getConnection(), region.getRegionName()));
279    verify(queueStorage, never()).removeLastSequenceIds(anyString(), anyList());
280
281    // No catalog family, then we should remove the whole row
282    clearCatalogFamily(region);
283    cleaner.chore();
284    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
285      assertFalse(table
286        .exists(new Get(region.getRegionName()).addFamily(HConstants.REPLICATION_BARRIER_FAMILY)));
287    }
288    verify(queueStorage, times(1)).removeLastSequenceIds(peerId,
289      Arrays.asList(region.getEncodedName()));
290  }
291
292  @Test
293  public void testDeleteRowForDeletedRegionNoPeers() throws IOException {
294    TableName tableName = TableName.valueOf(name.getMethodName());
295    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
296    addBarrier(region, 40, 50, 60);
297
298    ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class);
299    ReplicationBarrierCleaner cleaner = create(peerManager);
300    cleaner.chore();
301
302    verify(peerManager, times(1)).getSerialPeerIdsBelongsTo(tableName);
303    // There are no peers, and no catalog family for this region either, so we should remove the
304    // barriers. And since there is no catalog family, after we delete the barrier family, the whole
305    // row is deleted.
306    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
307      assertFalse(table.exists(new Get(region.getRegionName())));
308    }
309  }
310
311  private static class WarnOnlyStoppable implements Stoppable {
312    @Override
313    public void stop(String why) {
314      LOG.warn("TestReplicationBarrierCleaner received stop, ignoring. Reason: " + why);
315    }
316
317    @Override
318    public boolean isStopped() {
319      return false;
320    }
321  }
322}