001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.namequeues;
019
020import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
021import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY;
022import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.RS_COLUMN;
023import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.TIMESTAMP_COLUMN;
024import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME;
025import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_LENGTH_COLUMN;
026import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_NAME_COLUMN;
027import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_STATE_COLUMN;
028import static org.junit.Assert.assertEquals;
029
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.List;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellUtil;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.Waiter;
043import org.apache.hadoop.hbase.client.Connection;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
049import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
050import org.apache.hadoop.hbase.testclassification.MediumTests;
051import org.apache.hadoop.hbase.testclassification.RegionServerTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.wal.WAL;
054import org.junit.AfterClass;
055import org.junit.Before;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063@Category({ RegionServerTests.class, MediumTests.class })
064public class TestWALEventTracker {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestWALEventTracker.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
071  private static HBaseTestingUtility TEST_UTIL;
072  public static Configuration CONF;
073
074  @BeforeClass
075  public static void setup() throws Exception {
076    CONF = HBaseConfiguration.create();
077    CONF.setBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, true);
078    // Set the chore for less than a second.
079    CONF.setInt(NAMED_QUEUE_CHORE_DURATION_KEY, 900);
080    CONF.setLong(WALEventTrackerTableAccessor.SLEEP_INTERVAL_KEY, 100);
081    TEST_UTIL = new HBaseTestingUtility(CONF);
082    TEST_UTIL.startMiniCluster();
083  }
084
085  @AfterClass
086  public static void teardown() throws Exception {
087    LOG.info("Calling teardown");
088    TEST_UTIL.shutdownMiniHBaseCluster();
089  }
090
091  @Before
092  public void waitForWalEventTrackerTableCreation() {
093    Waiter.waitFor(CONF, 10000,
094      (Waiter.Predicate) () -> TEST_UTIL.getAdmin().tableExists(WAL_EVENT_TRACKER_TABLE_NAME));
095  }
096
097  @Test
098  public void testWALRolling() throws Exception {
099    Connection connection = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getConnection();
100    waitForWALEventTrackerTable(connection);
101    List<WAL> wals = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWALs();
102    assertEquals(1, wals.size());
103    AbstractFSWAL wal = (AbstractFSWAL) wals.get(0);
104    Path wal1Path = wal.getOldPath();
105    wal.rollWriter(true);
106
107    FileSystem fs = TEST_UTIL.getTestFileSystem();
108    long wal1Length = fs.getFileStatus(wal1Path).getLen();
109    Path wal2Path = wal.getOldPath();
110    String hostName =
111      TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName().getHostname();
112
113    TEST_UTIL.waitFor(5000, () -> getTableCount(connection) >= 3);
114    List<WALEventTrackerPayload> walEventsList = getRows(hostName, connection);
115
116    // There should be atleast 2 events for wal1Name, with ROLLING and ROLLED state. Most of the
117    // time we will lose ACTIVE event for the first wal creates since hmaster will take some time
118    // to create hbase:waleventtracker table and by that time RS will already create the first wal
119    // and will try to persist it.
120    compareEvents(hostName, wal1Path.getName(), walEventsList,
121      new ArrayList<>(Arrays.asList(WALEventTrackerListener.WalState.ROLLING.name(),
122        WALEventTrackerListener.WalState.ROLLED.name())),
123      false);
124
125    // There should be only 1 event for wal2Name which is current wal, with ACTIVE state
126    compareEvents(hostName, wal2Path.getName(), walEventsList,
127      new ArrayList<>(Arrays.asList(WALEventTrackerListener.WalState.ACTIVE.name())), true);
128
129    // Check that event with wal1Path and state ROLLED has the wal length set.
130    checkWALRolledEventHasSize(walEventsList, wal1Path.getName(), wal1Length);
131  }
132
133  private void checkWALRolledEventHasSize(List<WALEventTrackerPayload> walEvents, String walName,
134    long actualSize) {
135    List<WALEventTrackerPayload> eventsFilteredByNameState = new ArrayList<>();
136    // Filter the list by walName and wal state.
137    for (WALEventTrackerPayload event : walEvents) {
138      if (
139        walName.equals(event.getWalName())
140          && WALEventTrackerListener.WalState.ROLLED.name().equals(event.getState())
141      ) {
142        eventsFilteredByNameState.add(event);
143      }
144    }
145
146    assertEquals(1, eventsFilteredByNameState.size());
147    // We are not comparing the size of the WAL in the tracker table with actual size.
148    // For AsyncWAL implementation, since the WAL file is closed in an async fashion, the WAL length
149    // will always be incorrect.
150    // For FSHLog implementation, we close the WAL in an executor thread. So there will always be
151    // a difference of trailer size bytes.
152    // assertEquals(actualSize, eventsFilteredByNameState.get(0).getWalLength());
153  }
154
155  /**
156   * Compare the events from @{@link WALEventTrackerTableAccessor#WAL_EVENT_TRACKER_TABLE_NAME}
157   * @param hostName       hostname
158   * @param walName        walname
159   * @param walEvents      event from table
160   * @param expectedStates expected states for the hostname and wal name
161   * @param strict         whether to check strictly or not. Sometimes we lose the ACTIVE state
162   *                       event for the first wal since it takes some time for hmaster to create
163   *                       the table and by that time RS already creates the first WAL and will try
164   *                       to persist ACTIVE event to waleventtracker table.
165   */
166  private void compareEvents(String hostName, String walName,
167    List<WALEventTrackerPayload> walEvents, List<String> expectedStates, boolean strict) {
168    List<WALEventTrackerPayload> eventsFilteredByWalName = new ArrayList<>();
169
170    // Assert that all the events have the same host name i.e they came from the same RS.
171    for (WALEventTrackerPayload event : walEvents) {
172      assertEquals(hostName, event.getRsName());
173    }
174
175    // Filter the list by walName.
176    for (WALEventTrackerPayload event : walEvents) {
177      if (walName.equals(event.getWalName())) {
178        eventsFilteredByWalName.add(event);
179      }
180    }
181
182    // Assert that the list of events after filtering by walName should be same as expected states.
183    if (strict) {
184      assertEquals(expectedStates.size(), eventsFilteredByWalName.size());
185    }
186
187    for (WALEventTrackerPayload event : eventsFilteredByWalName) {
188      expectedStates.remove(event.getState());
189    }
190    assertEquals(0, expectedStates.size());
191  }
192
193  private void waitForWALEventTrackerTable(Connection connection) throws IOException {
194    TEST_UTIL.waitFor(5000, () -> TEST_UTIL.getAdmin().tableExists(WAL_EVENT_TRACKER_TABLE_NAME));
195  }
196
197  private List<WALEventTrackerPayload> getRows(String rowKeyPrefix, Connection connection)
198    throws IOException {
199    List<WALEventTrackerPayload> list = new ArrayList<>();
200    Scan scan = new Scan();
201    scan.withStartRow(Bytes.toBytes(rowKeyPrefix));
202    Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME);
203    ResultScanner scanner = table.getScanner(scan);
204
205    Result r;
206    while ((r = scanner.next()) != null) {
207      List<Cell> cells = r.listCells();
208      list.add(getPayload(cells));
209    }
210    return list;
211  }
212
213  private WALEventTrackerPayload getPayload(List<Cell> cells) {
214    String rsName = null, walName = null, walState = null;
215    long timestamp = 0L, walLength = 0L;
216    for (Cell cell : cells) {
217      byte[] qualifier = CellUtil.cloneQualifier(cell);
218      byte[] value = CellUtil.cloneValue(cell);
219      String qualifierStr = Bytes.toString(qualifier);
220
221      if (RS_COLUMN.equals(qualifierStr)) {
222        rsName = Bytes.toString(value);
223      } else if (WAL_NAME_COLUMN.equals(qualifierStr)) {
224        walName = Bytes.toString(value);
225      } else if (WAL_STATE_COLUMN.equals(qualifierStr)) {
226        walState = Bytes.toString(value);
227      } else if (TIMESTAMP_COLUMN.equals(qualifierStr)) {
228        timestamp = Bytes.toLong(value);
229      } else if (WAL_LENGTH_COLUMN.equals(qualifierStr)) {
230        walLength = Bytes.toLong(value);
231      }
232    }
233    return new WALEventTrackerPayload(rsName, walName, timestamp, walState, walLength);
234  }
235
236  private int getTableCount(Connection connection) throws Exception {
237    Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME);
238    ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM));
239    int count = 0;
240    while (resultScanner.next() != null) {
241      count++;
242    }
243    LOG.info("Table count: " + count);
244    return count;
245  }
246}