001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.atomic.AtomicInteger; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.client.Admin; 031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 032import org.apache.hadoop.hbase.client.RegionInfo; 033import org.apache.hadoop.hbase.client.RegionInfoBuilder; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.client.TableDescriptor; 036import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 037import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; 038import org.apache.hadoop.hbase.regionserver.HRegion; 039import org.apache.hadoop.hbase.regionserver.HRegionServer; 040import org.apache.hadoop.hbase.regionserver.HStore; 041import org.apache.hadoop.hbase.regionserver.Region; 042import org.apache.hadoop.hbase.regionserver.RegionServerServices; 043import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 044import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 045import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 046import org.apache.hadoop.hbase.security.User; 047import org.apache.hadoop.hbase.testclassification.LargeTests; 048import org.apache.hadoop.hbase.testclassification.MiscTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 052import org.apache.hadoop.hbase.wal.WAL; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 060 061import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 063 064/** 065 * Test for the case where a regionserver going down has enough cycles to do damage to regions that 066 * have actually been assigned elsehwere. 067 * <p> 068 * If we happen to assign a region before it fully done with in its old location -- i.e. it is on 069 * two servers at the same time -- all can work fine until the case where the region on the dying 070 * server decides to compact or otherwise change the region file set. The region in its new location 071 * will then get a surprise when it tries to do something w/ a file removed by the region in its old 072 * location on dying server. 073 * <p> 074 * Making a test for this case is a little tough in that even if a file is deleted up on the 075 * namenode, if the file was opened before the delete, it will continue to let reads happen until 076 * something changes the state of cached blocks in the dfsclient that was already open (a block from 077 * the deleted file is cleaned from the datanode by NN). 078 * <p> 079 * What we will do below is do an explicit check for existence on the files listed in the region 080 * that has had some files removed because of a compaction. This sort of hurry's along and makes 081 * certain what is a chance occurance. 082 */ 083@Category({ MiscTests.class, LargeTests.class }) 084public class TestIOFencing { 085 086 @ClassRule 087 public static final HBaseClassTestRule CLASS_RULE = 088 HBaseClassTestRule.forClass(TestIOFencing.class); 089 090 private static final Logger LOG = LoggerFactory.getLogger(TestIOFencing.class); 091 static { 092 // Uncomment the following lines if more verbosity is needed for 093 // debugging (see HBASE-12285 for details). 094 // ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); 095 // ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); 096 // ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); 097 // ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem")) 098 // .getLogger().setLevel(Level.ALL); 099 // ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); 100 } 101 102 public abstract static class CompactionBlockerRegion extends HRegion { 103 AtomicInteger compactCount = new AtomicInteger(); 104 volatile CountDownLatch compactionsBlocked = new CountDownLatch(0); 105 volatile CountDownLatch compactionsWaiting = new CountDownLatch(0); 106 107 @SuppressWarnings("deprecation") 108 public CompactionBlockerRegion(Path tableDir, WAL log, FileSystem fs, Configuration confParam, 109 RegionInfo info, TableDescriptor htd, RegionServerServices rsServices) { 110 super(tableDir, log, fs, confParam, info, htd, rsServices); 111 } 112 113 public void stopCompactions() { 114 compactionsBlocked = new CountDownLatch(1); 115 compactionsWaiting = new CountDownLatch(1); 116 } 117 118 public void allowCompactions() { 119 LOG.debug("allowing compactions"); 120 compactionsBlocked.countDown(); 121 } 122 123 public void waitForCompactionToBlock() throws IOException { 124 try { 125 LOG.debug("waiting for compaction to block"); 126 compactionsWaiting.await(); 127 LOG.debug("compaction block reached"); 128 } catch (InterruptedException ex) { 129 throw new IOException(ex); 130 } 131 } 132 133 @Override 134 public boolean compact(CompactionContext compaction, HStore store, 135 ThroughputController throughputController) throws IOException { 136 try { 137 return super.compact(compaction, store, throughputController); 138 } finally { 139 compactCount.getAndIncrement(); 140 } 141 } 142 143 @Override 144 public boolean compact(CompactionContext compaction, HStore store, 145 ThroughputController throughputController, User user) throws IOException { 146 try { 147 return super.compact(compaction, store, throughputController, user); 148 } finally { 149 compactCount.getAndIncrement(); 150 } 151 } 152 153 public int countStoreFiles() { 154 int count = 0; 155 for (HStore store : stores.values()) { 156 count += store.getStorefilesCount(); 157 } 158 return count; 159 } 160 } 161 162 /** 163 * An override of HRegion that allows us park compactions in a holding pattern and then when 164 * appropriate for the test, allow them proceed again. 165 */ 166 public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion { 167 168 public BlockCompactionsInPrepRegion(Path tableDir, WAL log, FileSystem fs, 169 Configuration confParam, RegionInfo info, TableDescriptor htd, 170 RegionServerServices rsServices) { 171 super(tableDir, log, fs, confParam, info, htd, rsServices); 172 } 173 174 @Override 175 protected void doRegionCompactionPrep() throws IOException { 176 compactionsWaiting.countDown(); 177 try { 178 compactionsBlocked.await(); 179 } catch (InterruptedException ex) { 180 throw new IOException(); 181 } 182 super.doRegionCompactionPrep(); 183 } 184 } 185 186 /** 187 * An override of HRegion that allows us park compactions in a holding pattern and then when 188 * appropriate for the test, allow them proceed again. This allows the compaction entry to go the 189 * WAL before blocking, but blocks afterwards 190 */ 191 public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion { 192 public BlockCompactionsInCompletionRegion(Path tableDir, WAL log, FileSystem fs, 193 Configuration confParam, RegionInfo info, TableDescriptor htd, 194 RegionServerServices rsServices) { 195 super(tableDir, log, fs, confParam, info, htd, rsServices); 196 } 197 198 @Override 199 protected HStore instantiateHStore(final ColumnFamilyDescriptor family, boolean warmup) 200 throws IOException { 201 return new BlockCompactionsInCompletionHStore(this, family, this.conf, warmup); 202 } 203 } 204 205 public static class BlockCompactionsInCompletionHStore extends HStore { 206 CompactionBlockerRegion r; 207 208 protected BlockCompactionsInCompletionHStore(HRegion region, ColumnFamilyDescriptor family, 209 Configuration confParam, boolean warmup) throws IOException { 210 super(region, family, confParam, warmup); 211 r = (CompactionBlockerRegion) region; 212 } 213 214 @Override 215 protected void refreshStoreSizeAndTotalBytes() throws IOException { 216 if (r != null) { 217 try { 218 r.compactionsWaiting.countDown(); 219 r.compactionsBlocked.await(); 220 } catch (InterruptedException ex) { 221 throw new IOException(ex); 222 } 223 } 224 super.refreshStoreSizeAndTotalBytes(); 225 } 226 } 227 228 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 229 private final static TableName TABLE_NAME = TableName.valueOf("tabletest"); 230 private final static byte[] FAMILY = Bytes.toBytes("family"); 231 private static final int FIRST_BATCH_COUNT = 4000; 232 private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT; 233 234 /** 235 * Test that puts up a regionserver, starts a compaction on a loaded region but holds the 236 * compaction until after we have killed the server and the region has come up on a new 237 * regionserver altogether. This fakes the double assignment case where region in one location 238 * changes the files out from underneath a region being served elsewhere. 239 */ 240 @Test 241 public void testFencingAroundCompaction() throws Exception { 242 for (MemoryCompactionPolicy policy : MemoryCompactionPolicy.values()) { 243 doTest(BlockCompactionsInPrepRegion.class, policy); 244 } 245 } 246 247 /** 248 * Test that puts up a regionserver, starts a compaction on a loaded region but holds the 249 * compaction completion until after we have killed the server and the region has come up on a new 250 * regionserver altogether. This fakes the double assignment case where region in one location 251 * changes the files out from underneath a region being served elsewhere. 252 */ 253 @Test 254 public void testFencingAroundCompactionAfterWALSync() throws Exception { 255 for (MemoryCompactionPolicy policy : MemoryCompactionPolicy.values()) { 256 doTest(BlockCompactionsInCompletionRegion.class, policy); 257 } 258 } 259 260 public void doTest(Class<?> regionClass, MemoryCompactionPolicy policy) throws Exception { 261 Configuration c = TEST_UTIL.getConfiguration(); 262 // Insert our custom region 263 c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class); 264 // Encourage plenty of flushes 265 c.setLong("hbase.hregion.memstore.flush.size", 25000); 266 c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName()); 267 // Only run compaction when we tell it to 268 c.setInt("hbase.hstore.compaction.min", 1); 269 c.setInt("hbase.hstore.compactionThreshold", 1000); 270 c.setLong("hbase.hstore.blockingStoreFiles", 1000); 271 // Compact quickly after we tell it to! 272 c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000); 273 c.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(policy)); 274 LOG.info("Starting mini cluster"); 275 TEST_UTIL.startMiniCluster(1); 276 CompactionBlockerRegion compactingRegion = null; 277 Admin admin = null; 278 try { 279 LOG.info("Creating admin"); 280 admin = TEST_UTIL.getConnection().getAdmin(); 281 LOG.info("Creating table"); 282 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 283 Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME); 284 LOG.info("Loading test table"); 285 // Find the region 286 List<HRegion> testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME); 287 assertEquals(1, testRegions.size()); 288 compactingRegion = (CompactionBlockerRegion) testRegions.get(0); 289 LOG.info("Blocking compactions"); 290 compactingRegion.stopCompactions(); 291 long lastFlushTime = compactingRegion.getEarliestFlushTimeForAllStores(); 292 // Load some rows 293 TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT); 294 295 // add a compaction from an older (non-existing) region to see whether we successfully skip 296 // those entries 297 RegionInfo oldHri = RegionInfoBuilder.newBuilder(table.getName()).build(); 298 CompactionDescriptor compactionDescriptor = 299 ProtobufUtil.toCompactionDescriptor(oldHri, FAMILY, Lists.newArrayList(new Path("/a")), 300 Lists.newArrayList(new Path("/b")), new Path("store_dir")); 301 WALUtil.writeCompactionMarker(compactingRegion.getWAL(), 302 ((HRegion) compactingRegion).getReplicationScope(), oldHri, compactionDescriptor, 303 compactingRegion.getMVCC(), null); 304 305 // Wait till flush has happened, otherwise there won't be multiple store files 306 long startWaitTime = EnvironmentEdgeManager.currentTime(); 307 while ( 308 compactingRegion.getEarliestFlushTimeForAllStores() <= lastFlushTime 309 || compactingRegion.countStoreFiles() <= 1 310 ) { 311 LOG.info("Waiting for the region to flush " 312 + compactingRegion.getRegionInfo().getRegionNameAsString()); 313 Thread.sleep(1000); 314 admin.flush(table.getName()); 315 assertTrue("Timed out waiting for the region to flush", 316 EnvironmentEdgeManager.currentTime() - startWaitTime < 30000); 317 } 318 assertTrue(compactingRegion.countStoreFiles() > 1); 319 final byte REGION_NAME[] = compactingRegion.getRegionInfo().getRegionName(); 320 LOG.info("Asking for compaction"); 321 admin.majorCompact(TABLE_NAME); 322 LOG.info("Waiting for compaction to be about to start"); 323 compactingRegion.waitForCompactionToBlock(); 324 LOG.info("Starting a new server"); 325 RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer(); 326 final HRegionServer newServer = newServerThread.getRegionServer(); 327 LOG.info("Killing region server ZK lease"); 328 TEST_UTIL.expireRegionServerSession(0); 329 CompactionBlockerRegion newRegion = null; 330 startWaitTime = EnvironmentEdgeManager.currentTime(); 331 LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME)); 332 333 // wait for region to be assigned and to go out of log replay if applicable 334 Waiter.waitFor(c, 60000, new Waiter.Predicate<Exception>() { 335 @Override 336 public boolean evaluate() throws Exception { 337 Region newRegion = newServer.getOnlineRegion(REGION_NAME); 338 return newRegion != null; 339 } 340 }); 341 342 newRegion = (CompactionBlockerRegion) newServer.getOnlineRegion(REGION_NAME); 343 344 // After compaction of old region finishes on the server that was going down, make sure that 345 // all the files we expect are still working when region is up in new location. 346 FileSystem fs = newRegion.getFilesystem(); 347 for (String f : newRegion.getStoreFileList(new byte[][] { FAMILY })) { 348 assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f))); 349 } 350 LOG.info("Allowing compaction to proceed"); 351 compactingRegion.allowCompactions(); 352 while (compactingRegion.compactCount.get() == 0) { 353 Thread.sleep(1000); 354 } 355 // The server we killed stays up until the compaction that was started before it was killed 356 // completes. In logs you should see the old regionserver now going down. 357 LOG.info("Compaction finished"); 358 359 // If we survive the split keep going... 360 // Now we make sure that the region isn't totally confused. Load up more rows. 361 TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, 362 FIRST_BATCH_COUNT + SECOND_BATCH_COUNT); 363 admin.majorCompact(TABLE_NAME); 364 startWaitTime = EnvironmentEdgeManager.currentTime(); 365 while (newRegion.compactCount.get() == 0) { 366 Thread.sleep(1000); 367 assertTrue("New region never compacted", 368 EnvironmentEdgeManager.currentTime() - startWaitTime < 180000); 369 } 370 int count; 371 for (int i = 0;; i++) { 372 try { 373 count = HBaseTestingUtil.countRows(table); 374 break; 375 } catch (DoNotRetryIOException e) { 376 // wait up to 30s 377 if (i >= 30 || !e.getMessage().contains("File does not exist")) { 378 throw e; 379 } 380 Thread.sleep(1000); 381 } 382 } 383 if (policy == MemoryCompactionPolicy.EAGER || policy == MemoryCompactionPolicy.ADAPTIVE) { 384 assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= count); 385 } else { 386 assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, count); 387 } 388 } finally { 389 if (compactingRegion != null) { 390 compactingRegion.allowCompactions(); 391 } 392 admin.close(); 393 TEST_UTIL.shutdownMiniCluster(); 394 } 395 } 396}