001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues; 019 020import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY; 021import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY; 022import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.RS_COLUMN; 023import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.TIMESTAMP_COLUMN; 024import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME; 025import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_LENGTH_COLUMN; 026import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_NAME_COLUMN; 027import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_STATE_COLUMN; 028import static org.junit.Assert.assertEquals; 029 030import java.io.IOException; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.List; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.CellUtil; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.HBaseTestingUtility; 042import org.apache.hadoop.hbase.Waiter; 043import org.apache.hadoop.hbase.client.Connection; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.ResultScanner; 046import org.apache.hadoop.hbase.client.Scan; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 049import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener; 050import org.apache.hadoop.hbase.testclassification.MediumTests; 051import org.apache.hadoop.hbase.testclassification.RegionServerTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.wal.WAL; 054import org.junit.AfterClass; 055import org.junit.Before; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063@Category({ RegionServerTests.class, MediumTests.class }) 064public class TestWALEventTracker { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestWALEventTracker.class); 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class); 071 private static HBaseTestingUtility TEST_UTIL; 072 public static Configuration CONF; 073 074 @BeforeClass 075 public static void setup() throws Exception { 076 CONF = HBaseConfiguration.create(); 077 CONF.setBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, true); 078 // Set the chore for less than a second. 079 CONF.setInt(NAMED_QUEUE_CHORE_DURATION_KEY, 900); 080 CONF.setLong(WALEventTrackerTableAccessor.SLEEP_INTERVAL_KEY, 100); 081 TEST_UTIL = new HBaseTestingUtility(CONF); 082 TEST_UTIL.startMiniCluster(); 083 } 084 085 @AfterClass 086 public static void teardown() throws Exception { 087 LOG.info("Calling teardown"); 088 TEST_UTIL.shutdownMiniHBaseCluster(); 089 } 090 091 @Before 092 public void waitForWalEventTrackerTableCreation() { 093 Waiter.waitFor(CONF, 10000, 094 (Waiter.Predicate) () -> TEST_UTIL.getAdmin().tableExists(WAL_EVENT_TRACKER_TABLE_NAME)); 095 } 096 097 @Test 098 public void testWALRolling() throws Exception { 099 Connection connection = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getConnection(); 100 waitForWALEventTrackerTable(connection); 101 List<WAL> wals = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWALs(); 102 assertEquals(1, wals.size()); 103 AbstractFSWAL wal = (AbstractFSWAL) wals.get(0); 104 Path wal1Path = wal.getOldPath(); 105 wal.rollWriter(true); 106 107 FileSystem fs = TEST_UTIL.getTestFileSystem(); 108 long wal1Length = fs.getFileStatus(wal1Path).getLen(); 109 Path wal2Path = wal.getOldPath(); 110 String hostName = 111 TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName().getHostname(); 112 113 TEST_UTIL.waitFor(5000, () -> getTableCount(connection) >= 3); 114 List<WALEventTrackerPayload> walEventsList = getRows(hostName, connection); 115 116 // There should be atleast 2 events for wal1Name, with ROLLING and ROLLED state. Most of the 117 // time we will lose ACTIVE event for the first wal creates since hmaster will take some time 118 // to create hbase:waleventtracker table and by that time RS will already create the first wal 119 // and will try to persist it. 120 compareEvents(hostName, wal1Path.getName(), walEventsList, 121 new ArrayList<>(Arrays.asList(WALEventTrackerListener.WalState.ROLLING.name(), 122 WALEventTrackerListener.WalState.ROLLED.name())), 123 false); 124 125 // There should be only 1 event for wal2Name which is current wal, with ACTIVE state 126 compareEvents(hostName, wal2Path.getName(), walEventsList, 127 new ArrayList<>(Arrays.asList(WALEventTrackerListener.WalState.ACTIVE.name())), true); 128 129 // Check that event with wal1Path and state ROLLED has the wal length set. 130 checkWALRolledEventHasSize(walEventsList, wal1Path.getName(), wal1Length); 131 } 132 133 private void checkWALRolledEventHasSize(List<WALEventTrackerPayload> walEvents, String walName, 134 long actualSize) { 135 List<WALEventTrackerPayload> eventsFilteredByNameState = new ArrayList<>(); 136 // Filter the list by walName and wal state. 137 for (WALEventTrackerPayload event : walEvents) { 138 if ( 139 walName.equals(event.getWalName()) 140 && WALEventTrackerListener.WalState.ROLLED.name().equals(event.getState()) 141 ) { 142 eventsFilteredByNameState.add(event); 143 } 144 } 145 146 assertEquals(1, eventsFilteredByNameState.size()); 147 // We are not comparing the size of the WAL in the tracker table with actual size. 148 // For AsyncWAL implementation, since the WAL file is closed in an async fashion, the WAL length 149 // will always be incorrect. 150 // For FSHLog implementation, we close the WAL in an executor thread. So there will always be 151 // a difference of trailer size bytes. 152 // assertEquals(actualSize, eventsFilteredByNameState.get(0).getWalLength()); 153 } 154 155 /** 156 * Compare the events from @{@link WALEventTrackerTableAccessor#WAL_EVENT_TRACKER_TABLE_NAME} 157 * @param hostName hostname 158 * @param walName walname 159 * @param walEvents event from table 160 * @param expectedStates expected states for the hostname and wal name 161 * @param strict whether to check strictly or not. Sometimes we lose the ACTIVE state 162 * event for the first wal since it takes some time for hmaster to create 163 * the table and by that time RS already creates the first WAL and will try 164 * to persist ACTIVE event to waleventtracker table. 165 */ 166 private void compareEvents(String hostName, String walName, 167 List<WALEventTrackerPayload> walEvents, List<String> expectedStates, boolean strict) { 168 List<WALEventTrackerPayload> eventsFilteredByWalName = new ArrayList<>(); 169 170 // Assert that all the events have the same host name i.e they came from the same RS. 171 for (WALEventTrackerPayload event : walEvents) { 172 assertEquals(hostName, event.getRsName()); 173 } 174 175 // Filter the list by walName. 176 for (WALEventTrackerPayload event : walEvents) { 177 if (walName.equals(event.getWalName())) { 178 eventsFilteredByWalName.add(event); 179 } 180 } 181 182 // Assert that the list of events after filtering by walName should be same as expected states. 183 if (strict) { 184 assertEquals(expectedStates.size(), eventsFilteredByWalName.size()); 185 } 186 187 for (WALEventTrackerPayload event : eventsFilteredByWalName) { 188 expectedStates.remove(event.getState()); 189 } 190 assertEquals(0, expectedStates.size()); 191 } 192 193 private void waitForWALEventTrackerTable(Connection connection) throws IOException { 194 TEST_UTIL.waitFor(5000, () -> TEST_UTIL.getAdmin().tableExists(WAL_EVENT_TRACKER_TABLE_NAME)); 195 } 196 197 private List<WALEventTrackerPayload> getRows(String rowKeyPrefix, Connection connection) 198 throws IOException { 199 List<WALEventTrackerPayload> list = new ArrayList<>(); 200 Scan scan = new Scan(); 201 scan.withStartRow(Bytes.toBytes(rowKeyPrefix)); 202 Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME); 203 ResultScanner scanner = table.getScanner(scan); 204 205 Result r; 206 while ((r = scanner.next()) != null) { 207 List<Cell> cells = r.listCells(); 208 list.add(getPayload(cells)); 209 } 210 return list; 211 } 212 213 private WALEventTrackerPayload getPayload(List<Cell> cells) { 214 String rsName = null, walName = null, walState = null; 215 long timestamp = 0L, walLength = 0L; 216 for (Cell cell : cells) { 217 byte[] qualifier = CellUtil.cloneQualifier(cell); 218 byte[] value = CellUtil.cloneValue(cell); 219 String qualifierStr = Bytes.toString(qualifier); 220 221 if (RS_COLUMN.equals(qualifierStr)) { 222 rsName = Bytes.toString(value); 223 } else if (WAL_NAME_COLUMN.equals(qualifierStr)) { 224 walName = Bytes.toString(value); 225 } else if (WAL_STATE_COLUMN.equals(qualifierStr)) { 226 walState = Bytes.toString(value); 227 } else if (TIMESTAMP_COLUMN.equals(qualifierStr)) { 228 timestamp = Bytes.toLong(value); 229 } else if (WAL_LENGTH_COLUMN.equals(qualifierStr)) { 230 walLength = Bytes.toLong(value); 231 } 232 } 233 return new WALEventTrackerPayload(rsName, walName, timestamp, walState, walLength); 234 } 235 236 private int getTableCount(Connection connection) throws Exception { 237 Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME); 238 ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM)); 239 int count = 0; 240 while (resultScanner.next() != null) { 241 count++; 242 } 243 LOG.info("Table count: " + count); 244 return count; 245 } 246}