001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNotEquals; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028import static org.mockito.Mockito.mock; 029import static org.mockito.Mockito.when; 030 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.Collection; 034import java.util.Collections; 035import java.util.Comparator; 036import java.util.HashSet; 037import java.util.List; 038import java.util.Map; 039import java.util.NavigableMap; 040import java.util.Set; 041import java.util.TreeMap; 042import java.util.UUID; 043import java.util.concurrent.ConcurrentSkipListMap; 044import java.util.concurrent.CountDownLatch; 045import java.util.concurrent.ExecutorService; 046import java.util.concurrent.Executors; 047import java.util.concurrent.atomic.AtomicBoolean; 048import java.util.stream.Collectors; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.fs.FileStatus; 051import org.apache.hadoop.fs.FileSystem; 052import org.apache.hadoop.fs.Path; 053import org.apache.hadoop.hbase.Coprocessor; 054import org.apache.hadoop.hbase.ExtendedCellScanner; 055import org.apache.hadoop.hbase.HBaseConfiguration; 056import org.apache.hadoop.hbase.HBaseTestingUtil; 057import org.apache.hadoop.hbase.HConstants; 058import org.apache.hadoop.hbase.KeyValue; 059import org.apache.hadoop.hbase.ServerName; 060import org.apache.hadoop.hbase.TableName; 061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 063import org.apache.hadoop.hbase.client.Durability; 064import org.apache.hadoop.hbase.client.Get; 065import org.apache.hadoop.hbase.client.Put; 066import org.apache.hadoop.hbase.client.RegionInfo; 067import org.apache.hadoop.hbase.client.RegionInfoBuilder; 068import org.apache.hadoop.hbase.client.Result; 069import org.apache.hadoop.hbase.client.TableDescriptor; 070import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 071import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 072import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; 073import org.apache.hadoop.hbase.regionserver.ChunkCreator; 074import org.apache.hadoop.hbase.regionserver.FlushPolicy; 075import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; 076import org.apache.hadoop.hbase.regionserver.HRegion; 077import org.apache.hadoop.hbase.regionserver.HStore; 078import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 079import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 080import org.apache.hadoop.hbase.regionserver.RegionServerServices; 081import org.apache.hadoop.hbase.regionserver.SequenceId; 082import org.apache.hadoop.hbase.util.Bytes; 083import org.apache.hadoop.hbase.util.CommonFSUtils; 084import org.apache.hadoop.hbase.util.EnvironmentEdge; 085import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 086import org.apache.hadoop.hbase.util.Threads; 087import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 088import org.apache.hadoop.hbase.wal.WAL; 089import org.apache.hadoop.hbase.wal.WALEdit; 090import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 091import org.apache.hadoop.hbase.wal.WALKey; 092import org.apache.hadoop.hbase.wal.WALKeyImpl; 093import org.junit.AfterClass; 094import org.junit.Before; 095import org.junit.BeforeClass; 096import org.junit.Rule; 097import org.junit.Test; 098import org.junit.rules.TestName; 099import org.slf4j.Logger; 100import org.slf4j.LoggerFactory; 101 102public abstract class AbstractTestFSWAL { 103 104 protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestFSWAL.class); 105 106 protected static Configuration CONF; 107 protected static FileSystem FS; 108 protected static Path DIR; 109 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 110 111 @Rule 112 public final TestName currentTest = new TestName(); 113 114 @Before 115 public void setUp() throws Exception { 116 FileStatus[] entries = FS.listStatus(new Path("/")); 117 for (FileStatus dir : entries) { 118 FS.delete(dir.getPath(), true); 119 } 120 final Path hbaseDir = TEST_UTIL.createRootDir(); 121 final Path hbaseWALDir = TEST_UTIL.createWALRootDir(); 122 DIR = new Path(hbaseWALDir, currentTest.getMethodName()); 123 assertNotEquals(hbaseDir, hbaseWALDir); 124 } 125 126 @BeforeClass 127 public static void setUpBeforeClass() throws Exception { 128 // Make block sizes small. 129 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); 130 // quicker heartbeat interval for faster DN death notification 131 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 132 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 133 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); 134 135 // faster failover with cluster.shutdown();fs.close() idiom 136 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1); 137 TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1); 138 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500); 139 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, 140 SampleRegionWALCoprocessor.class.getName()); 141 TEST_UTIL.startMiniDFSCluster(3); 142 143 CONF = TEST_UTIL.getConfiguration(); 144 FS = TEST_UTIL.getDFSCluster().getFileSystem(); 145 } 146 147 @AfterClass 148 public static void tearDownAfterClass() throws Exception { 149 TEST_UTIL.shutdownMiniCluster(); 150 } 151 152 protected abstract AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String WALDir, 153 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 154 boolean failIfWALExists, String prefix, String suffix) throws IOException; 155 156 protected abstract AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String WALDir, 157 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 158 boolean failIfWALExists, String prefix, String suffix, Runnable action) throws IOException; 159 160 /** 161 * A loaded WAL coprocessor won't break existing WAL test cases. 162 */ 163 @Test 164 public void testWALCoprocessorLoaded() throws Exception { 165 // test to see whether the coprocessor is loaded or not. 166 AbstractFSWAL<?> wal = null; 167 try { 168 wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 169 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 170 WALCoprocessorHost host = wal.getCoprocessorHost(); 171 Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class); 172 assertNotNull(c); 173 } finally { 174 if (wal != null) { 175 wal.close(); 176 } 177 } 178 } 179 180 protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times, 181 MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes, String cf) 182 throws IOException { 183 final byte[] row = Bytes.toBytes(cf); 184 for (int i = 0; i < times; i++) { 185 long timestamp = EnvironmentEdgeManager.currentTime(); 186 WALEdit cols = new WALEdit(); 187 WALEditInternalHelper.addExtendedCell(cols, new KeyValue(row, row, row, timestamp, row)); 188 WALKeyImpl key = 189 new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), SequenceId.NO_SEQUENCE_ID, 190 timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes); 191 log.appendData(hri, key, cols); 192 } 193 log.sync(); 194 } 195 196 /** 197 * helper method to simulate region flush for a WAL. 198 */ 199 protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) { 200 wal.startCacheFlush(regionEncodedName, flushedFamilyNames); 201 wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM); 202 } 203 204 /** 205 * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws 206 * exception if we do). Comparison is based on the timestamp present in the wal name. 207 */ 208 @Test 209 public void testWALComparator() throws Exception { 210 AbstractFSWAL<?> wal1 = null; 211 AbstractFSWAL<?> walMeta = null; 212 try { 213 wal1 = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 214 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 215 LOG.debug("Log obtained is: " + wal1); 216 Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR; 217 Path p1 = wal1.computeFilename(11); 218 Path p2 = wal1.computeFilename(12); 219 // comparing with itself returns 0 220 assertTrue(comp.compare(p1, p1) == 0); 221 // comparing with different filenum. 222 assertTrue(comp.compare(p1, p2) < 0); 223 walMeta = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 224 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, 225 AbstractFSWALProvider.META_WAL_PROVIDER_ID); 226 Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR; 227 228 Path p1WithMeta = walMeta.computeFilename(11); 229 Path p2WithMeta = walMeta.computeFilename(12); 230 assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0); 231 assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0); 232 // mixing meta and non-meta logs gives error 233 boolean ex = false; 234 try { 235 comp.compare(p1WithMeta, p2); 236 } catch (IllegalArgumentException e) { 237 ex = true; 238 } 239 assertTrue("Comparator doesn't complain while checking meta log files", ex); 240 boolean exMeta = false; 241 try { 242 compMeta.compare(p1WithMeta, p2); 243 } catch (IllegalArgumentException e) { 244 exMeta = true; 245 } 246 assertTrue("Meta comparator doesn't complain while checking log files", exMeta); 247 } finally { 248 if (wal1 != null) { 249 wal1.close(); 250 } 251 if (walMeta != null) { 252 walMeta.close(); 253 } 254 } 255 } 256 257 // now we will close asynchronously and will not archive a wal file unless it is fully closed, so 258 // sometimes we need to wait a bit before asserting, especially when you want to test the removal 259 // of numRolledLogFiles 260 private void waitNumRolledLogFiles(AbstractFSWAL<?> wal, int expected) { 261 TEST_UTIL.waitFor(5000, () -> wal.getNumRolledLogFiles() == expected); 262 } 263 264 private void testFindMemStoresEligibleForFlush(AbstractFSWAL<?> wal) throws IOException { 265 String cf1 = "cf1"; 266 String cf2 = "cf2"; 267 String cf3 = "cf3"; 268 TableDescriptor t1 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t1")) 269 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf1)).build(); 270 TableDescriptor t2 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t2")) 271 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf1)).build(); 272 RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName()).build(); 273 RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName()).build(); 274 275 List<ColumnFamilyDescriptor> cfs = new ArrayList<>(); 276 cfs.add(ColumnFamilyDescriptorBuilder.of(cf1)); 277 cfs.add(ColumnFamilyDescriptorBuilder.of(cf2)); 278 TableDescriptor t3 = 279 TableDescriptorBuilder.newBuilder(TableName.valueOf("t3")).setColumnFamilies(cfs).build(); 280 RegionInfo hri3 = RegionInfoBuilder.newBuilder(t3.getTableName()).build(); 281 282 // add edits and roll the wal 283 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 284 NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 285 for (byte[] fam : t1.getColumnFamilyNames()) { 286 scopes1.put(fam, 0); 287 } 288 NavigableMap<byte[], Integer> scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 289 for (byte[] fam : t2.getColumnFamilyNames()) { 290 scopes2.put(fam, 0); 291 } 292 NavigableMap<byte[], Integer> scopes3 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 293 for (byte[] fam : t3.getColumnFamilyNames()) { 294 scopes3.put(fam, 0); 295 } 296 addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); 297 wal.rollWriter(); 298 // add some more edits and roll the wal. This would reach the log number threshold 299 addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); 300 wal.rollWriter(); 301 // with above rollWriter call, the max logs limit is reached. 302 waitNumRolledLogFiles(wal, 2); 303 304 // get the regions to flush; since there is only one region in the oldest wal, it should 305 // return only one region. 306 Map<byte[], List<byte[]>> regionsToFlush = wal.findRegionsToForceFlush(); 307 assertEquals(1, regionsToFlush.size()); 308 assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]); 309 // insert edits in second region 310 addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1); 311 // get the regions to flush, it should still read region1. 312 regionsToFlush = wal.findRegionsToForceFlush(); 313 assertEquals(1, regionsToFlush.size()); 314 assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]); 315 // flush region 1, and roll the wal file. Only last wal which has entries for region1 should 316 // remain. 317 flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); 318 wal.rollWriter(); 319 // only one wal should remain now (that is for the second region). 320 waitNumRolledLogFiles(wal, 1); 321 // flush the second region 322 flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); 323 wal.rollWriter(true); 324 // no wal should remain now. 325 waitNumRolledLogFiles(wal, 0); 326 // add edits both to region 1 and region 2, and roll. 327 addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); 328 addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1); 329 wal.rollWriter(); 330 // add edits and roll the writer, to reach the max logs limit. 331 waitNumRolledLogFiles(wal, 1); 332 addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); 333 wal.rollWriter(); 334 // it should return two regions to flush, as the oldest wal file has entries 335 // for both regions. 336 regionsToFlush = wal.findRegionsToForceFlush(); 337 assertEquals(2, regionsToFlush.size()); 338 // flush both regions 339 flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); 340 flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); 341 wal.rollWriter(true); 342 waitNumRolledLogFiles(wal, 0); 343 // Add an edit to region1, and roll the wal. 344 addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); 345 // tests partial flush: roll on a partial flush, and ensure that wal is not archived. 346 wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); 347 wal.rollWriter(); 348 wal.completeCacheFlush(hri1.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 349 waitNumRolledLogFiles(wal, 1); 350 351 // clear test data 352 flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); 353 wal.rollWriter(true); 354 // add edits for three familes 355 addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1); 356 addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf2); 357 addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf3); 358 wal.rollWriter(); 359 addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1); 360 wal.rollWriter(); 361 waitNumRolledLogFiles(wal, 2); 362 // flush one family before archive oldest wal 363 Set<byte[]> flushedFamilyNames = new HashSet<>(); 364 flushedFamilyNames.add(Bytes.toBytes(cf1)); 365 flushRegion(wal, hri3.getEncodedNameAsBytes(), flushedFamilyNames); 366 regionsToFlush = wal.findRegionsToForceFlush(); 367 // then only two family need to be flushed when archive oldest wal 368 assertEquals(1, regionsToFlush.size()); 369 assertEquals(hri3.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]); 370 assertEquals(2, regionsToFlush.get(hri3.getEncodedNameAsBytes()).size()); 371 } 372 373 /** 374 * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the list of 375 * regions which should be flushed in order to archive the oldest wal file. 376 * <p> 377 * This method tests this behavior by inserting edits and rolling the wal enough times to reach 378 * the max number of logs threshold. It checks whether we get the "right regions and stores" for 379 * flush on rolling the wal. 380 */ 381 @Test 382 public void testFindMemStoresEligibleForFlush() throws Exception { 383 LOG.debug("testFindMemStoresEligibleForFlush"); 384 Configuration conf1 = HBaseConfiguration.create(CONF); 385 conf1.setInt("hbase.regionserver.maxlogs", 1); 386 try (AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(), 387 HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null)) { 388 testFindMemStoresEligibleForFlush(wal); 389 } 390 391 } 392 393 @Test(expected = IOException.class) 394 public void testFailedToCreateWALIfParentRenamed() 395 throws IOException, CommonFSUtils.StreamLacksCapabilityException { 396 final String name = "testFailedToCreateWALIfParentRenamed"; 397 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), name, 398 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 399 long filenum = EnvironmentEdgeManager.currentTime(); 400 Path path = wal.computeFilename(filenum); 401 wal.createWriterInstance(FS, path); 402 Path parent = path.getParent(); 403 path = wal.computeFilename(filenum + 1); 404 Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting"); 405 FS.rename(parent, newPath); 406 wal.createWriterInstance(FS, path); 407 fail("It should fail to create the new WAL"); 408 } 409 410 /** 411 * Test flush for sure has a sequence id that is beyond the last edit appended. We do this by 412 * slowing appends in the background ring buffer thread while in foreground we call flush. The 413 * addition of the sync over HRegion in flush should fix an issue where flush was returning before 414 * all of its appends had made it out to the WAL (HBASE-11109). 415 * @see <a href="https://issues.apache.org/jira/browse/HBASE-11109">HBASE-11109</a> 416 */ 417 @Test 418 public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException { 419 String testName = currentTest.getMethodName(); 420 final TableName tableName = TableName.valueOf(testName); 421 final RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); 422 final byte[] rowName = tableName.getName(); 423 final TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) 424 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f")).build(); 425 HRegion r = HBaseTestingUtil.createRegionAndWAL(hri, TEST_UTIL.getDefaultRootDirPath(), 426 TEST_UTIL.getConfiguration(), htd); 427 HBaseTestingUtil.closeRegionAndWAL(r); 428 final int countPerFamily = 10; 429 final AtomicBoolean goslow = new AtomicBoolean(false); 430 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 431 for (byte[] fam : htd.getColumnFamilyNames()) { 432 scopes.put(fam, 0); 433 } 434 // subclass and doctor a method. 435 AbstractFSWAL<?> wal = newSlowWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 436 testName, CONF, null, true, null, null, new Runnable() { 437 438 @Override 439 public void run() { 440 if (goslow.get()) { 441 Threads.sleep(100); 442 LOG.debug("Sleeping before appending 100ms"); 443 } 444 } 445 }); 446 HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(), 447 TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal); 448 EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 449 try { 450 List<Put> puts = null; 451 for (byte[] fam : htd.getColumnFamilyNames()) { 452 puts = TestWALReplay.addRegionEdits(rowName, fam, countPerFamily, ee, region, "x"); 453 } 454 455 // Now assert edits made it in. 456 final Get g = new Get(rowName); 457 Result result = region.get(g); 458 assertEquals(countPerFamily * htd.getColumnFamilyNames().size(), result.size()); 459 460 // Construct a WALEdit and add it a few times to the WAL. 461 WALEdit edits = new WALEdit(); 462 for (Put p : puts) { 463 ExtendedCellScanner cs = p.cellScanner(); 464 while (cs.advance()) { 465 WALEditInternalHelper.addExtendedCell(edits, cs.current()); 466 } 467 } 468 // Add any old cluster id. 469 List<UUID> clusterIds = new ArrayList<>(1); 470 clusterIds.add(TEST_UTIL.getRandomUUID()); 471 // Now make appends run slow. 472 goslow.set(true); 473 for (int i = 0; i < countPerFamily; i++) { 474 final RegionInfo info = region.getRegionInfo(); 475 final WALKeyImpl logkey = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 476 EnvironmentEdgeManager.currentTime(), clusterIds, -1, -1, region.getMVCC(), scopes); 477 wal.append(info, logkey, edits, true); 478 region.getMVCC().completeAndWait(logkey.getWriteEntry()); 479 } 480 region.flush(true); 481 // FlushResult.flushSequenceId is not visible here so go get the current sequence id. 482 long currentSequenceId = region.getReadPoint(null); 483 // Now release the appends 484 goslow.set(false); 485 assertTrue(currentSequenceId >= region.getReadPoint(null)); 486 } finally { 487 region.close(true); 488 wal.close(); 489 } 490 } 491 492 @Test 493 public void testSyncNoAppend() throws IOException { 494 String testName = currentTest.getMethodName(); 495 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName, 496 CONF, null, true, null, null); 497 try { 498 wal.sync(); 499 } finally { 500 wal.close(); 501 } 502 } 503 504 @Test 505 public void testWriteEntryCanBeNull() throws IOException { 506 String testName = currentTest.getMethodName(); 507 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName, 508 CONF, null, true, null, null); 509 wal.close(); 510 TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) 511 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 512 RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build(); 513 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 514 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 515 for (byte[] fam : td.getColumnFamilyNames()) { 516 scopes.put(fam, 0); 517 } 518 long timestamp = EnvironmentEdgeManager.currentTime(); 519 byte[] row = Bytes.toBytes("row"); 520 WALEdit cols = new WALEdit(); 521 WALEditInternalHelper.addExtendedCell(cols, new KeyValue(row, row, row, timestamp, row)); 522 WALKeyImpl key = 523 new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), SequenceId.NO_SEQUENCE_ID, 524 timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes); 525 try { 526 wal.append(ri, key, cols, true); 527 fail("Should fail since the wal has already been closed"); 528 } catch (IOException e) { 529 // expected 530 assertThat(e.getMessage(), containsString("log is closed")); 531 // the WriteEntry should be null since we fail before setting it. 532 assertNull(key.getWriteEntry()); 533 } 534 } 535 536 @Test(expected = WALClosedException.class) 537 public void testRollWriterForClosedWAL() throws IOException { 538 String testName = currentTest.getMethodName(); 539 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName, 540 CONF, null, true, null, null); 541 wal.close(); 542 wal.rollWriter(); 543 } 544 545 private AbstractFSWAL<?> createHoldingWAL(String testName, AtomicBoolean startHoldingForAppend, 546 CountDownLatch holdAppend) throws IOException { 547 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName, 548 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 549 // newWAL has already called wal.init() 550 wal.registerWALActionsListener(new WALActionsListener() { 551 @Override 552 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 553 if (startHoldingForAppend.get()) { 554 try { 555 holdAppend.await(); 556 } catch (InterruptedException e) { 557 LOG.error(e.toString(), e); 558 } 559 } 560 } 561 }); 562 return wal; 563 } 564 565 private HRegion createHoldingHRegion(Configuration conf, TableDescriptor htd, WAL wal) 566 throws IOException { 567 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 568 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 569 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 570 TEST_UTIL.createLocalHRegion(hri, CONF, htd, wal).close(); 571 RegionServerServices rsServices = mock(RegionServerServices.class); 572 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456)); 573 when(rsServices.getConfiguration()).thenReturn(conf); 574 return HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal, conf, rsServices, null); 575 } 576 577 private void doPutWithAsyncWAL(ExecutorService exec, HRegion region, Put put, 578 Runnable flushOrCloseRegion, AtomicBoolean startHoldingForAppend, 579 CountDownLatch flushOrCloseFinished, CountDownLatch holdAppend) 580 throws InterruptedException, IOException { 581 // do a regular write first because of memstore size calculation. 582 region.put(put); 583 584 startHoldingForAppend.set(true); 585 region.put(new Put(put).setDurability(Durability.ASYNC_WAL)); 586 587 // give the put a chance to start 588 Threads.sleep(3000); 589 590 exec.submit(flushOrCloseRegion); 591 592 // give the flush a chance to start. Flush should have got the region lock, and 593 // should have been waiting on the mvcc complete after this. 594 Threads.sleep(3000); 595 596 // let the append to WAL go through now that the flush already started 597 holdAppend.countDown(); 598 flushOrCloseFinished.await(); 599 } 600 601 // Testcase for HBASE-23181 602 @Test 603 public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException { 604 String testName = currentTest.getMethodName(); 605 byte[] b = Bytes.toBytes("b"); 606 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) 607 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); 608 609 AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); 610 CountDownLatch holdAppend = new CountDownLatch(1); 611 CountDownLatch closeFinished = new CountDownLatch(1); 612 ExecutorService exec = Executors.newFixedThreadPool(1); 613 AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend); 614 // open a new region which uses this WAL 615 HRegion region = createHoldingHRegion(TEST_UTIL.getConfiguration(), htd, wal); 616 try { 617 doPutWithAsyncWAL(exec, region, new Put(b).addColumn(b, b, b), () -> { 618 try { 619 Map<?, ?> closeResult = region.close(); 620 LOG.info("Close result:" + closeResult); 621 closeFinished.countDown(); 622 } catch (IOException e) { 623 LOG.error(e.toString(), e); 624 } 625 }, startHoldingForAppend, closeFinished, holdAppend); 626 627 // now check the region's unflushed seqIds. 628 long seqId = getEarliestMemStoreSeqNum(wal, region.getRegionInfo().getEncodedNameAsBytes()); 629 assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM, 630 seqId); 631 } finally { 632 exec.shutdownNow(); 633 region.close(); 634 wal.close(); 635 } 636 } 637 638 public static long getEarliestMemStoreSeqNum(WAL wal, byte[] encodedRegionName) { 639 if (wal != null) { 640 if (wal instanceof AbstractFSWAL) { 641 return ((AbstractFSWAL<?>) wal).getSequenceIdAccounting() 642 .getLowestSequenceId(encodedRegionName); 643 } 644 } 645 return HConstants.NO_SEQNUM; 646 } 647 648 private static final Set<byte[]> STORES_TO_FLUSH = 649 Collections.newSetFromMap(new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR)); 650 651 // Testcase for HBASE-23157 652 @Test 653 public void testMaxFlushedSequenceIdGoBackwards() throws IOException, InterruptedException { 654 String testName = currentTest.getMethodName(); 655 byte[] a = Bytes.toBytes("a"); 656 byte[] b = Bytes.toBytes("b"); 657 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) 658 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(a)) 659 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); 660 661 AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); 662 CountDownLatch holdAppend = new CountDownLatch(1); 663 CountDownLatch flushFinished = new CountDownLatch(1); 664 ExecutorService exec = Executors.newFixedThreadPool(2); 665 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 666 conf.setClass(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushSpecificStoresPolicy.class, 667 FlushPolicy.class); 668 AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend); 669 // open a new region which uses this WAL 670 HRegion region = createHoldingHRegion(conf, htd, wal); 671 try { 672 Put put = new Put(a).addColumn(a, a, a).addColumn(b, b, b); 673 doPutWithAsyncWAL(exec, region, put, () -> { 674 try { 675 HRegion.FlushResult flushResult = region.flush(true); 676 LOG.info("Flush result:" + flushResult.getResult()); 677 LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded()); 678 flushFinished.countDown(); 679 } catch (IOException e) { 680 LOG.error(e.toString(), e); 681 } 682 }, startHoldingForAppend, flushFinished, holdAppend); 683 684 // get the max flushed sequence id after the first flush 685 long maxFlushedSeqId1 = region.getMaxFlushedSeqId(); 686 687 region.put(put); 688 // this time we only flush family a 689 STORES_TO_FLUSH.add(a); 690 region.flush(false); 691 692 // get the max flushed sequence id after the second flush 693 long maxFlushedSeqId2 = region.getMaxFlushedSeqId(); 694 // make sure that the maxFlushedSequenceId does not go backwards 695 assertTrue( 696 "maxFlushedSeqId1(" + maxFlushedSeqId1 697 + ") is not greater than or equal to maxFlushedSeqId2(" + maxFlushedSeqId2 + ")", 698 maxFlushedSeqId1 <= maxFlushedSeqId2); 699 } finally { 700 exec.shutdownNow(); 701 region.close(); 702 wal.close(); 703 } 704 } 705 706 public static final class FlushSpecificStoresPolicy extends FlushPolicy { 707 708 @Override 709 public Collection<HStore> selectStoresToFlush() { 710 if (STORES_TO_FLUSH.isEmpty()) { 711 return region.getStores(); 712 } else { 713 return STORES_TO_FLUSH.stream().map(region::getStore).collect(Collectors.toList()); 714 } 715 } 716 } 717}