001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.HashSet; 027import java.util.NavigableMap; 028import java.util.Set; 029import java.util.TreeMap; 030import java.util.concurrent.ThreadLocalRandom; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.KeyValue; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.RegionInfo; 043import org.apache.hadoop.hbase.client.RegionInfoBuilder; 044import org.apache.hadoop.hbase.client.TableDescriptor; 045import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 046import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.testclassification.RegionServerTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.CommonFSUtils; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.junit.AfterClass; 053import org.junit.Before; 054import org.junit.BeforeClass; 055import org.junit.ClassRule; 056import org.junit.Rule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.junit.rules.TestName; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063@Category({ RegionServerTests.class, MediumTests.class }) 064public class TestFSHLogProvider { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestFSHLogProvider.class); 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestFSHLogProvider.class); 071 072 private static Configuration conf; 073 private static FileSystem fs; 074 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 075 private MultiVersionConcurrencyControl mvcc; 076 077 @Rule 078 public final TestName currentTest = new TestName(); 079 080 @Before 081 public void setUp() throws Exception { 082 mvcc = new MultiVersionConcurrencyControl(); 083 FileStatus[] entries = fs.listStatus(new Path("/")); 084 for (FileStatus dir : entries) { 085 fs.delete(dir.getPath(), true); 086 } 087 } 088 089 @BeforeClass 090 public static void setUpBeforeClass() throws Exception { 091 // Make block sizes small. 092 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); 093 // quicker heartbeat interval for faster DN death notification 094 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 095 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 096 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); 097 098 // faster failover with cluster.shutdown();fs.close() idiom 099 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1); 100 TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1); 101 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500); 102 TEST_UTIL.startMiniDFSCluster(3); 103 104 // Set up a working space for our tests. 105 TEST_UTIL.createRootDir(); 106 conf = TEST_UTIL.getConfiguration(); 107 fs = TEST_UTIL.getDFSCluster().getFileSystem(); 108 } 109 110 @AfterClass 111 public static void tearDownAfterClass() throws Exception { 112 TEST_UTIL.shutdownMiniCluster(); 113 } 114 115 @Test 116 public void testGetServerNameFromWALDirectoryName() throws IOException { 117 ServerName sn = ServerName.valueOf("hn", 450, 1398); 118 String hl = CommonFSUtils.getRootDir(conf) + "/" 119 + AbstractFSWALProvider.getWALDirectoryName(sn.toString()); 120 121 // Must not throw exception 122 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, null)); 123 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, 124 CommonFSUtils.getRootDir(conf).toUri().toString())); 125 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, "")); 126 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, " ")); 127 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, hl)); 128 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, hl + "qdf")); 129 assertNull(AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, "sfqf" + hl + "qdf")); 130 131 final String wals = "/WALs/"; 132 ServerName parsed = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, 133 CommonFSUtils.getRootDir(conf).toUri().toString() + wals + sn 134 + "/localhost%2C32984%2C1343316388997.1343316390417"); 135 assertEquals("standard", sn, parsed); 136 137 parsed = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, hl + "/qdf"); 138 assertEquals("subdir", sn, parsed); 139 140 parsed = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, 141 CommonFSUtils.getRootDir(conf).toUri().toString() + wals + sn 142 + "-splitting/localhost%3A57020.1340474893931"); 143 assertEquals("split", sn, parsed); 144 } 145 146 private void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times, 147 NavigableMap<byte[], Integer> scopes) throws IOException { 148 final byte[] row = Bytes.toBytes("row"); 149 for (int i = 0; i < times; i++) { 150 long timestamp = EnvironmentEdgeManager.currentTime(); 151 WALEdit cols = new WALEdit(); 152 cols.add(new KeyValue(row, row, row, timestamp, row)); 153 log.appendData(hri, 154 getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp, scopes), cols); 155 } 156 log.sync(); 157 } 158 159 /** 160 * used by TestDefaultWALProviderWithHLogKey 161 */ 162 private WALKeyImpl getWalKey(final byte[] info, final TableName tableName, final long timestamp, 163 NavigableMap<byte[], Integer> scopes) { 164 return new WALKeyImpl(info, tableName, timestamp, mvcc, scopes); 165 } 166 167 /** 168 * helper method to simulate region flush for a WAL. 169 */ 170 private void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) { 171 wal.startCacheFlush(regionEncodedName, flushedFamilyNames); 172 wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM); 173 } 174 175 // now we will close asynchronously and will not archive a wal file unless it is fully closed, so 176 // sometimes we need to wait a bit before asserting, especially when you want to test the removal 177 // of numRolledLogFiles 178 private void waitNumRolledLogFiles(WAL wal, int expected) { 179 TEST_UTIL.waitFor(5000, () -> AbstractFSWALProvider.getNumRolledLogFiles(wal) == expected); 180 } 181 182 private void testLogCleaning(WALFactory wals) throws IOException { 183 TableDescriptor htd = 184 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) 185 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 186 TableDescriptor htd2 = 187 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName() + "2")) 188 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 189 NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 190 for (byte[] fam : htd.getColumnFamilyNames()) { 191 scopes1.put(fam, 0); 192 } 193 NavigableMap<byte[], Integer> scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 194 for (byte[] fam : htd2.getColumnFamilyNames()) { 195 scopes2.put(fam, 0); 196 } 197 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 198 RegionInfo hri2 = RegionInfoBuilder.newBuilder(htd2.getTableName()).build(); 199 // we want to mix edits from regions, so pick our own identifier. 200 WAL log = wals.getWAL(null); 201 202 // Add a single edit and make sure that rolling won't remove the file 203 // Before HBASE-3198 it used to delete it 204 addEdits(log, hri, htd, 1, scopes1); 205 log.rollWriter(); 206 waitNumRolledLogFiles(log, 1); 207 208 // See if there's anything wrong with more than 1 edit 209 addEdits(log, hri, htd, 2, scopes1); 210 log.rollWriter(); 211 assertEquals(2, FSHLogProvider.getNumRolledLogFiles(log)); 212 213 // Now mix edits from 2 regions, still no flushing 214 addEdits(log, hri, htd, 1, scopes1); 215 addEdits(log, hri2, htd2, 1, scopes2); 216 addEdits(log, hri, htd, 1, scopes1); 217 addEdits(log, hri2, htd2, 1, scopes2); 218 log.rollWriter(); 219 waitNumRolledLogFiles(log, 3); 220 221 // Flush the first region, we expect to see the first two files getting 222 // archived. We need to append something or writer won't be rolled. 223 addEdits(log, hri2, htd2, 1, scopes2); 224 log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 225 log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 226 log.rollWriter(); 227 waitNumRolledLogFiles(log, 2); 228 229 // Flush the second region, which removes all the remaining output files 230 // since the oldest was completely flushed and the two others only contain 231 // flush information 232 addEdits(log, hri2, htd2, 1, scopes2); 233 log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getColumnFamilyNames()); 234 log.completeCacheFlush(hri2.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 235 log.rollWriter(); 236 waitNumRolledLogFiles(log, 0); 237 } 238 239 @Test 240 public void testLogCleaning() throws Exception { 241 LOG.info(currentTest.getMethodName()); 242 Configuration localConf = new Configuration(conf); 243 localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName()); 244 WALFactory wals = new WALFactory(localConf, currentTest.getMethodName()); 245 try { 246 testLogCleaning(wals); 247 } finally { 248 wals.close(); 249 } 250 } 251 252 private void testWALArchiving(WALFactory wals) throws IOException { 253 TableDescriptor table1 = 254 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName() + "1")) 255 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 256 TableDescriptor table2 = 257 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName() + "2")) 258 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 259 NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 260 for (byte[] fam : table1.getColumnFamilyNames()) { 261 scopes1.put(fam, 0); 262 } 263 NavigableMap<byte[], Integer> scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 264 for (byte[] fam : table2.getColumnFamilyNames()) { 265 scopes2.put(fam, 0); 266 } 267 WAL wal = wals.getWAL(null); 268 assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal)); 269 RegionInfo hri1 = RegionInfoBuilder.newBuilder(table1.getTableName()).build(); 270 RegionInfo hri2 = RegionInfoBuilder.newBuilder(table2.getTableName()).build(); 271 // variables to mock region sequenceIds. 272 // start with the testing logic: insert a waledit, and roll writer 273 addEdits(wal, hri1, table1, 1, scopes1); 274 wal.rollWriter(); 275 // assert that the wal is rolled 276 waitNumRolledLogFiles(wal, 1); 277 // add edits in the second wal file, and roll writer. 278 addEdits(wal, hri1, table1, 1, scopes1); 279 wal.rollWriter(); 280 // assert that the wal is rolled 281 waitNumRolledLogFiles(wal, 2); 282 // add a waledit to table1, and flush the region. 283 addEdits(wal, hri1, table1, 3, scopes1); 284 flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getColumnFamilyNames()); 285 // roll log; all old logs should be archived. 286 wal.rollWriter(); 287 waitNumRolledLogFiles(wal, 0); 288 // add an edit to table2, and roll writer 289 addEdits(wal, hri2, table2, 1, scopes2); 290 wal.rollWriter(); 291 waitNumRolledLogFiles(wal, 1); 292 // add edits for table1, and roll writer 293 addEdits(wal, hri1, table1, 2, scopes1); 294 wal.rollWriter(); 295 waitNumRolledLogFiles(wal, 2); 296 // add edits for table2, and flush hri1. 297 addEdits(wal, hri2, table2, 2, scopes2); 298 flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getColumnFamilyNames()); 299 // the log : region-sequenceId map is 300 // log1: region2 (unflushed) 301 // log2: region1 (flushed) 302 // log3: region2 (unflushed) 303 // roll the writer; log2 should be archived. 304 wal.rollWriter(); 305 waitNumRolledLogFiles(wal, 2); 306 // flush region2, and all logs should be archived. 307 addEdits(wal, hri2, table2, 2, scopes2); 308 flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getColumnFamilyNames()); 309 wal.rollWriter(); 310 waitNumRolledLogFiles(wal, 0); 311 } 312 313 /** 314 * Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs and 315 * also don't archive "live logs" (that is, a log with un-flushed entries). 316 * <p> 317 * This is what it does: It creates two regions, and does a series of inserts along with log 318 * rolling. Whenever a WAL is rolled, HLogBase checks previous wals for archiving. A wal is 319 * eligible for archiving if for all the regions which have entries in that wal file, have flushed 320 * - past their maximum sequence id in that wal file. 321 * <p> 322 */ 323 @Test 324 public void testWALArchiving() throws IOException { 325 LOG.debug(currentTest.getMethodName()); 326 327 Configuration localConf = new Configuration(conf); 328 localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName()); 329 WALFactory wals = new WALFactory(localConf, currentTest.getMethodName()); 330 try { 331 testWALArchiving(wals); 332 } finally { 333 wals.close(); 334 } 335 } 336 337 /** 338 * Write to a log file with three concurrent threads and verifying all data is written. 339 */ 340 @Test 341 public void testConcurrentWrites() throws Exception { 342 // Run the WPE tool with three threads writing 3000 edits each concurrently. 343 // When done, verify that all edits were written. 344 int errCode = 345 WALPerformanceEvaluation.innerMain(new Configuration(TEST_UTIL.getConfiguration()), 346 new String[] { "-threads", "3", "-verify", "-noclosefs", "-iterations", "3000" }); 347 assertEquals(0, errCode); 348 } 349 350 /** 351 * Ensure that we can use Set.add to deduplicate WALs 352 */ 353 @Test 354 public void setMembershipDedups() throws IOException { 355 Configuration localConf = new Configuration(conf); 356 localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName()); 357 WALFactory wals = new WALFactory(localConf, currentTest.getMethodName()); 358 try { 359 final Set<WAL> seen = new HashSet<>(1); 360 assertTrue("first attempt to add WAL from default provider should work.", 361 seen.add(wals.getWAL(null))); 362 for (int i = 0; i < 1000; i++) { 363 assertFalse( 364 "default wal provider is only supposed to return a single wal, which should " 365 + "compare as .equals itself.", 366 seen.add(wals.getWAL(RegionInfoBuilder 367 .newBuilder(TableName.valueOf("Table-" + ThreadLocalRandom.current().nextInt())) 368 .build()))); 369 } 370 } finally { 371 wals.close(); 372 } 373 } 374}