001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.EOFException; 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.util.ArrayList; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Set; 032import java.util.concurrent.atomic.AtomicBoolean; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 040import org.apache.hadoop.hbase.client.Put; 041import org.apache.hadoop.hbase.client.RegionInfo; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.ResultScanner; 044import org.apache.hadoop.hbase.client.Scan; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.client.TableDescriptor; 047import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 048import org.apache.hadoop.hbase.fs.HFileSystem; 049import org.apache.hadoop.hbase.regionserver.HRegion; 050import org.apache.hadoop.hbase.testclassification.LargeTests; 051import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.CommonFSUtils; 054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 055import org.apache.hadoop.hbase.util.JVMClusterUtil; 056import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 057import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 058import org.apache.hadoop.hbase.wal.FSHLogProvider; 059import org.apache.hadoop.hbase.wal.WAL; 060import org.apache.hadoop.hbase.wal.WALFactory; 061import org.apache.hadoop.hbase.wal.WALStreamReader; 062import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 063import org.apache.hadoop.hdfs.server.datanode.DataNode; 064import org.junit.BeforeClass; 065import org.junit.ClassRule; 066import org.junit.Test; 067import org.junit.experimental.categories.Category; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071@Category({ VerySlowRegionServerTests.class, LargeTests.class }) 072public class TestLogRolling extends AbstractTestLogRolling { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestLogRolling.class); 077 078 private static final Logger LOG = LoggerFactory.getLogger(TestLogRolling.class); 079 080 @BeforeClass 081 public static void setUpBeforeClass() throws Exception { 082 // TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2 083 // profile. See HBASE-9337 for related issues. 084 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); 085 086 /**** configuration for testLogRollOnDatanodeDeath ****/ 087 // lower the namenode & datanode heartbeat so the namenode 088 // quickly detects datanode failures 089 Configuration conf = TEST_UTIL.getConfiguration(); 090 conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 091 conf.setInt("dfs.heartbeat.interval", 1); 092 // the namenode might still try to choose the recently-dead datanode 093 // for a pipeline, so try to a new pipeline multiple times 094 conf.setInt("dfs.client.block.write.retries", 30); 095 conf.setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2); 096 conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); 097 conf.set(WALFactory.WAL_PROVIDER, "filesystem"); 098 AbstractTestLogRolling.setUpBeforeClass(); 099 } 100 101 public static class SlowSyncLogWriter extends ProtobufLogWriter { 102 @Override 103 public void sync(boolean forceSync) throws IOException { 104 try { 105 Thread.sleep(syncLatencyMillis); 106 } catch (InterruptedException e) { 107 InterruptedIOException ex = new InterruptedIOException(); 108 ex.initCause(e); 109 throw ex; 110 } 111 super.sync(forceSync); 112 } 113 } 114 115 @Override 116 protected void setSlowLogWriter(Configuration conf) { 117 conf.set(FSHLogProvider.WRITER_IMPL, SlowSyncLogWriter.class.getName()); 118 } 119 120 @Override 121 protected void setDefaultLogWriter(Configuration conf) { 122 conf.set(FSHLogProvider.WRITER_IMPL, ProtobufLogWriter.class.getName()); 123 } 124 125 void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout) 126 throws IOException { 127 for (int i = 0; i < 10; i++) { 128 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", (start + i)))); 129 put.addColumn(HConstants.CATALOG_FAMILY, null, value); 130 table.put(put); 131 } 132 Put tmpPut = new Put(Bytes.toBytes("tmprow")); 133 tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value); 134 long startTime = EnvironmentEdgeManager.currentTime(); 135 long remaining = timeout; 136 while (remaining > 0) { 137 if (log.isLowReplicationRollEnabled() == expect) { 138 break; 139 } else { 140 // Trigger calling FSHlog#checkLowReplication() 141 table.put(tmpPut); 142 try { 143 Thread.sleep(200); 144 } catch (InterruptedException e) { 145 // continue 146 } 147 remaining = timeout - (EnvironmentEdgeManager.currentTime() - startTime); 148 } 149 } 150 } 151 152 @Test 153 public void testSlowSyncLogRolling() throws Exception { 154 // Create the test table 155 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 156 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 157 admin.createTable(desc); 158 try (Table table = TEST_UTIL.getConnection().getTable(desc.getTableName())) { 159 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 160 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 161 final AbstractFSWAL<?> log = getWALAndRegisterSlowSyncHook(region); 162 163 // Set default log writer, no additional latency to any sync on the hlog. 164 checkSlowSync(log, table, -1, 10, false); 165 166 // Adds 200 ms of latency to any sync on the hlog. This should be more than sufficient to 167 // trigger slow sync warnings. 168 // Write some data. 169 // We need to write at least 5 times, but double it. We should only request 170 // a SLOW_SYNC roll once in the current interval. 171 checkSlowSync(log, table, 200, 10, true); 172 173 // Adds 5000 ms of latency to any sync on the hlog. This will trip the other threshold. 174 // Write some data. Should only take one sync. 175 checkSlowSync(log, table, 5000, 1, true); 176 177 // Set default log writer, no additional latency to any sync on the hlog. 178 checkSlowSync(log, table, -1, 10, false); 179 } 180 } 181 182 /** 183 * Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 & 184 * syncFs() support (HDFS-200) 185 */ 186 @Test 187 public void testLogRollOnDatanodeDeath() throws Exception { 188 189 Long oldValue = TEST_UTIL.getConfiguration() 190 .getLong("hbase.regionserver.hlog.check.lowreplication.interval", -1); 191 192 try { 193 /** 194 * When we reuse the code of AsyncFSWAL to FSHLog, the low replication is only checked by 195 * {@link LogRoller#checkLowReplication},so in order to make this test spend less time,we 196 * should minimize following config which is maximized by 197 * {@link AbstractTestLogRolling#setUpBeforeClass} 198 */ 199 TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval", 200 1000); 201 this.tearDown(); 202 this.setUp(); 203 204 TEST_UTIL.ensureSomeRegionServersAvailable(2); 205 assertTrue("This test requires WAL file replication set to 2.", 206 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2); 207 LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 208 209 this.server = cluster.getRegionServer(0); 210 211 // Create the test table and open it 212 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 213 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 214 215 admin.createTable(desc); 216 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 217 218 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 219 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 220 final FSHLog log = (FSHLog) server.getWAL(region); 221 final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false); 222 223 log.registerWALActionsListener(new WALActionsListener() { 224 @Override 225 public void logRollRequested(WALActionsListener.RollRequestReason reason) { 226 switch (reason) { 227 case LOW_REPLICATION: 228 lowReplicationHookCalled.lazySet(true); 229 break; 230 default: 231 break; 232 } 233 } 234 }); 235 236 // add up the datanode count, to ensure proper replication when we kill 1 237 // This function is synchronous; when it returns, the dfs cluster is active 238 // We start 3 servers and then stop 2 to avoid a directory naming conflict 239 // when we stop/start a namenode later, as mentioned in HBASE-5163 240 List<DataNode> existingNodes = dfsCluster.getDataNodes(); 241 int numDataNodes = 3; 242 TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval", 243 1000); 244 dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, null, null); 245 List<DataNode> allNodes = dfsCluster.getDataNodes(); 246 for (int i = allNodes.size() - 1; i >= 0; i--) { 247 if (existingNodes.contains(allNodes.get(i))) { 248 dfsCluster.stopDataNode(i); 249 } 250 } 251 252 assertTrue( 253 "DataNodes " + dfsCluster.getDataNodes().size() + " default replication " 254 + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()), 255 dfsCluster.getDataNodes().size() 256 >= fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1); 257 258 writeData(table, 2); 259 260 long curTime = EnvironmentEdgeManager.currentTime(); 261 LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName()); 262 long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 263 assertTrue("Log should have a timestamp older than now", 264 curTime > oldFilenum && oldFilenum != -1); 265 266 assertTrue("The log shouldn't have rolled yet", 267 oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log)); 268 final DatanodeInfo[] pipeline = log.getPipeline(); 269 assertTrue(pipeline.length == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 270 271 // kill a datanode in the pipeline to force a log roll on the next sync() 272 // This function is synchronous, when it returns the node is killed. 273 assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null); 274 275 // this write should succeed, but trigger a log roll 276 writeData(table, 2); 277 278 TEST_UTIL.waitFor(10000, 100, () -> { 279 long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 280 return newFilenum > oldFilenum && newFilenum > curTime && lowReplicationHookCalled.get(); 281 }); 282 283 long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 284 285 // write some more log data (this should use a new hdfs_out) 286 writeData(table, 3); 287 assertTrue("The log should not roll again.", 288 AbstractFSWALProvider.extractFileNumFromWAL(log) == newFilenum); 289 // kill another datanode in the pipeline, so the replicas will be lower than 290 // the configured value 2. 291 assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null); 292 293 batchWriteAndWait(table, log, 3, false, 14000); 294 int replication = log.getLogReplication(); 295 assertTrue( 296 "LowReplication Roller should've been disabled, current replication=" + replication, 297 !log.isLowReplicationRollEnabled()); 298 299 dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null); 300 301 // Force roll writer. The new log file will have the default replications, 302 // and the LowReplication Roller will be enabled. 303 log.rollWriter(true); 304 batchWriteAndWait(table, log, 13, true, 10000); 305 replication = log.getLogReplication(); 306 assertTrue("New log file should have the default replication instead of " + replication, 307 replication == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 308 assertTrue("LowReplication Roller should've been enabled", log.isLowReplicationRollEnabled()); 309 } finally { 310 TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval", 311 oldValue); 312 } 313 } 314 315 /** 316 * Test that WAL is rolled when all data nodes in the pipeline have been restarted. 317 */ 318 @Test 319 public void testLogRollOnPipelineRestart() throws Exception { 320 LOG.info("Starting testLogRollOnPipelineRestart"); 321 assertTrue("This test requires WAL file replication.", 322 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) > 1); 323 LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); 324 // When the hbase:meta table can be opened, the region servers are running 325 Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); 326 try { 327 this.server = cluster.getRegionServer(0); 328 329 // Create the test table and open it 330 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) 331 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); 332 333 admin.createTable(desc); 334 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); 335 336 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); 337 RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); 338 final WAL log = server.getWAL(region); 339 final List<Path> paths = new ArrayList<>(1); 340 final List<Integer> preLogRolledCalled = new ArrayList<>(); 341 342 paths.add(AbstractFSWALProvider.getCurrentFileName(log)); 343 log.registerWALActionsListener(new WALActionsListener() { 344 345 @Override 346 public void preLogRoll(Path oldFile, Path newFile) { 347 LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile); 348 preLogRolledCalled.add(new Integer(1)); 349 } 350 351 @Override 352 public void postLogRoll(Path oldFile, Path newFile) { 353 paths.add(newFile); 354 } 355 }); 356 357 writeData(table, 1002); 358 359 long curTime = EnvironmentEdgeManager.currentTime(); 360 LOG.info("log.getCurrentFileName()): " + AbstractFSWALProvider.getCurrentFileName(log)); 361 long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 362 assertTrue("Log should have a timestamp older than now", 363 curTime > oldFilenum && oldFilenum != -1); 364 365 assertTrue("The log shouldn't have rolled yet", 366 oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log)); 367 368 // roll all datanodes in the pipeline 369 dfsCluster.restartDataNodes(); 370 Thread.sleep(1000); 371 dfsCluster.waitActive(); 372 LOG.info("Data Nodes restarted"); 373 validateData(table, 1002); 374 375 // this write should succeed, but trigger a log roll 376 writeData(table, 1003); 377 long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); 378 379 assertTrue("Missing datanode should've triggered a log roll", 380 newFilenum > oldFilenum && newFilenum > curTime); 381 validateData(table, 1003); 382 383 writeData(table, 1004); 384 385 // roll all datanode again 386 dfsCluster.restartDataNodes(); 387 Thread.sleep(1000); 388 dfsCluster.waitActive(); 389 LOG.info("Data Nodes restarted"); 390 validateData(table, 1004); 391 392 // this write should succeed, but trigger a log roll 393 writeData(table, 1005); 394 395 // force a log roll to read back and verify previously written logs 396 log.rollWriter(true); 397 assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(), 398 preLogRolledCalled.size() >= 1); 399 400 // read back the data written 401 Set<String> loggedRows = new HashSet<>(); 402 for (Path p : paths) { 403 LOG.debug("recovering lease for " + p); 404 RecoverLeaseFSUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p, 405 TEST_UTIL.getConfiguration(), null); 406 407 LOG.debug("Reading WAL " + CommonFSUtils.getPath(p)); 408 try (WALStreamReader reader = 409 WALFactory.createStreamReader(fs, p, TEST_UTIL.getConfiguration())) { 410 WAL.Entry entry; 411 while ((entry = reader.next()) != null) { 412 LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells()); 413 for (Cell cell : entry.getEdit().getCells()) { 414 loggedRows.add( 415 Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 416 } 417 } 418 } catch (EOFException e) { 419 LOG.debug("EOF reading file " + CommonFSUtils.getPath(p)); 420 } 421 } 422 423 // verify the written rows are there 424 assertTrue(loggedRows.contains("row1002")); 425 assertTrue(loggedRows.contains("row1003")); 426 assertTrue(loggedRows.contains("row1004")); 427 assertTrue(loggedRows.contains("row1005")); 428 429 // flush all regions 430 for (HRegion r : server.getOnlineRegionsLocalContext()) { 431 try { 432 r.flush(true); 433 } catch (Exception e) { 434 // This try/catch was added by HBASE-14317. It is needed 435 // because this issue tightened up the semantic such that 436 // a failed append could not be followed by a successful 437 // sync. What is coming out here is a failed sync, a sync 438 // that used to 'pass'. 439 LOG.info(e.toString(), e); 440 } 441 } 442 443 ResultScanner scanner = table.getScanner(new Scan()); 444 try { 445 for (int i = 2; i <= 5; i++) { 446 Result r = scanner.next(); 447 assertNotNull(r); 448 assertFalse(r.isEmpty()); 449 assertEquals("row100" + i, Bytes.toString(r.getRow())); 450 } 451 } finally { 452 scanner.close(); 453 } 454 455 // verify that no region servers aborted 456 for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster() 457 .getRegionServerThreads()) { 458 assertFalse(rsThread.getRegionServer().isAborted()); 459 } 460 } finally { 461 if (t != null) t.close(); 462 } 463 } 464 465}