001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Set; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ThreadLocalRandom; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.regex.Pattern; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.client.Admin; 030import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.client.TableDescriptor; 038import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 039import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 040import org.apache.hadoop.hbase.log.HBaseMarkers; 041import org.apache.hadoop.hbase.testclassification.IntegrationTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.HBaseFsck; 044import org.apache.hadoop.hbase.util.Threads; 045import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil; 046import org.apache.hadoop.util.ToolRunner; 047import org.junit.Assert; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * Integration test that verifies Procedure V2. DDL operations should go through (rollforward or 055 * rollback) when primary master is killed by ChaosMonkey (default MASTER_KILLING). 056 * <p> 057 * </p> 058 * Multiple Worker threads are started to randomly do the following Actions in loops: Actions 059 * generating and populating tables: 060 * <ul> 061 * <li>CreateTableAction</li> 062 * <li>DisableTableAction</li> 063 * <li>EnableTableAction</li> 064 * <li>DeleteTableAction</li> 065 * <li>AddRowAction</li> 066 * </ul> 067 * Actions performing column family DDL operations: 068 * <ul> 069 * <li>AddColumnFamilyAction</li> 070 * <li>AlterColumnFamilyVersionsAction</li> 071 * <li>AlterColumnFamilyEncodingAction</li> 072 * <li>DeleteColumnFamilyAction</li> 073 * </ul> 074 * Actions performing namespace DDL operations: 075 * <ul> 076 * <li>AddNamespaceAction</li> 077 * <li>AlterNamespaceAction</li> 078 * <li>DeleteNamespaceAction</li> 079 * </ul> 080 * <br/> 081 * The threads run for a period of time (default 20 minutes) then are stopped at the end of runtime. 082 * Verification is performed towards those checkpoints: 083 * <ol> 084 * <li>No Actions throw Exceptions.</li> 085 * <li>No inconsistencies are detected in hbck.</li> 086 * </ol> 087 * <p> 088 * This test should be run by the hbase user since it invokes hbck at the end 089 * </p> 090 * <p> 091 * Usage: hbase org.apache.hadoop.hbase.IntegrationTestDDLMasterFailover 092 * -Dhbase.IntegrationTestDDLMasterFailover.runtime=1200000 093 * -Dhbase.IntegrationTestDDLMasterFailover.numThreads=20 094 * -Dhbase.IntegrationTestDDLMasterFailover.numRegions=50 --monkey masterKilling 095 */ 096 097@Category(IntegrationTests.class) 098public class IntegrationTestDDLMasterFailover extends IntegrationTestBase { 099 100 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestDDLMasterFailover.class); 101 102 private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster 103 104 protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000; 105 106 protected static final int DEFAULT_NUM_THREADS = 20; 107 108 protected static final int DEFAULT_NUM_REGIONS = 50; // number of regions in pre-split tables 109 110 private boolean keepObjectsAtTheEnd = false; 111 protected HBaseClusterInterface cluster; 112 113 protected Connection connection; 114 115 /** 116 * A soft limit on how long we should run 117 */ 118 protected static final String RUN_TIME_KEY = "hbase.%s.runtime"; 119 protected static final String NUM_THREADS_KEY = "hbase.%s.numThreads"; 120 protected static final String NUM_REGIONS_KEY = "hbase.%s.numRegions"; 121 122 protected AtomicBoolean running = new AtomicBoolean(true); 123 124 protected AtomicBoolean create_table = new AtomicBoolean(true); 125 126 protected int numThreads, numRegions; 127 128 ConcurrentHashMap<String, NamespaceDescriptor> namespaceMap = new ConcurrentHashMap<>(); 129 130 ConcurrentHashMap<TableName, TableDescriptor> enabledTables = new ConcurrentHashMap<>(); 131 132 ConcurrentHashMap<TableName, TableDescriptor> disabledTables = new ConcurrentHashMap<>(); 133 134 ConcurrentHashMap<TableName, TableDescriptor> deletedTables = new ConcurrentHashMap<>(); 135 136 @Override 137 public void setUpCluster() throws Exception { 138 util = getTestingUtil(getConf()); 139 LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers"); 140 util.initializeCluster(getMinServerCount()); 141 LOG.debug("Done initializing/checking cluster"); 142 cluster = util.getHBaseClusterInterface(); 143 } 144 145 @Override 146 public void cleanUpCluster() throws Exception { 147 if (!keepObjectsAtTheEnd) { 148 Admin admin = util.getAdmin(); 149 for (TableName tableName : admin.listTableNames(Pattern.compile("ittable-\\d+"))) { 150 admin.disableTable(tableName); 151 admin.deleteTable(tableName); 152 } 153 NamespaceDescriptor[] nsds = admin.listNamespaceDescriptors(); 154 for (NamespaceDescriptor nsd : nsds) { 155 if (nsd.getName().matches("itnamespace\\d+")) { 156 LOG.info("Removing namespace=" + nsd.getName()); 157 admin.deleteNamespace(nsd.getName()); 158 } 159 } 160 } 161 162 enabledTables.clear(); 163 disabledTables.clear(); 164 deletedTables.clear(); 165 namespaceMap.clear(); 166 167 Connection connection = getConnection(); 168 connection.close(); 169 super.cleanUpCluster(); 170 } 171 172 protected int getMinServerCount() { 173 return SERVER_COUNT; 174 } 175 176 protected synchronized void setConnection(Connection connection) { 177 this.connection = connection; 178 } 179 180 protected synchronized Connection getConnection() { 181 if (this.connection == null) { 182 try { 183 Connection connection = ConnectionFactory.createConnection(getConf()); 184 setConnection(connection); 185 } catch (IOException e) { 186 LOG.error(HBaseMarkers.FATAL, "Failed to establish connection.", e); 187 } 188 } 189 return connection; 190 } 191 192 protected void verifyNamespaces() throws IOException { 193 Connection connection = getConnection(); 194 Admin admin = connection.getAdmin(); 195 // iterating concurrent map 196 for (String nsName : namespaceMap.keySet()) { 197 try { 198 Assert.assertTrue("Namespace: " + nsName + " in namespaceMap does not exist", 199 admin.getNamespaceDescriptor(nsName) != null); 200 } catch (NamespaceNotFoundException nsnfe) { 201 Assert 202 .fail("Namespace: " + nsName + " in namespaceMap does not exist: " + nsnfe.getMessage()); 203 } 204 } 205 admin.close(); 206 } 207 208 protected void verifyTables() throws IOException { 209 Connection connection = getConnection(); 210 Admin admin = connection.getAdmin(); 211 // iterating concurrent map 212 for (TableName tableName : enabledTables.keySet()) { 213 Assert.assertTrue("Table: " + tableName + " in enabledTables is not enabled", 214 admin.isTableEnabled(tableName)); 215 } 216 for (TableName tableName : disabledTables.keySet()) { 217 Assert.assertTrue("Table: " + tableName + " in disabledTables is not disabled", 218 admin.isTableDisabled(tableName)); 219 } 220 for (TableName tableName : deletedTables.keySet()) { 221 Assert.assertFalse("Table: " + tableName + " in deletedTables is not deleted", 222 admin.tableExists(tableName)); 223 } 224 admin.close(); 225 } 226 227 @Test 228 public void testAsUnitTest() throws Exception { 229 runTest(); 230 } 231 232 @Override 233 public int runTestFromCommandLine() throws Exception { 234 int ret = runTest(); 235 return ret; 236 } 237 238 private abstract class MasterAction { 239 Connection connection = getConnection(); 240 241 abstract void perform() throws IOException; 242 } 243 244 private abstract class NamespaceAction extends MasterAction { 245 final String nsTestConfigKey = "hbase.namespace.testKey"; 246 247 // NamespaceAction has implemented selectNamespace() shared by multiple namespace Actions 248 protected NamespaceDescriptor 249 selectNamespace(ConcurrentHashMap<String, NamespaceDescriptor> namespaceMap) { 250 // synchronization to prevent removal from multiple threads 251 synchronized (namespaceMap) { 252 // randomly select namespace from namespaceMap 253 if (namespaceMap.isEmpty()) { 254 return null; 255 } 256 ArrayList<String> namespaceList = new ArrayList<>(namespaceMap.keySet()); 257 String randomKey = 258 namespaceList.get(ThreadLocalRandom.current().nextInt(namespaceList.size())); 259 NamespaceDescriptor randomNsd = namespaceMap.get(randomKey); 260 // remove from namespaceMap 261 namespaceMap.remove(randomKey); 262 return randomNsd; 263 } 264 } 265 } 266 267 private class CreateNamespaceAction extends NamespaceAction { 268 @Override 269 void perform() throws IOException { 270 Admin admin = connection.getAdmin(); 271 try { 272 NamespaceDescriptor nsd; 273 while (true) { 274 nsd = createNamespaceDesc(); 275 try { 276 if (admin.getNamespaceDescriptor(nsd.getName()) != null) { 277 // the namespace has already existed. 278 continue; 279 } else { 280 // currently, the code never return null - always throws exception if 281 // namespace is not found - this just a defensive programming to make 282 // sure null situation is handled in case the method changes in the 283 // future. 284 break; 285 } 286 } catch (NamespaceNotFoundException nsnfe) { 287 // This is expected for a random generated NamespaceDescriptor 288 break; 289 } 290 } 291 LOG.info("Creating namespace:" + nsd); 292 admin.createNamespace(nsd); 293 NamespaceDescriptor freshNamespaceDesc = admin.getNamespaceDescriptor(nsd.getName()); 294 Assert.assertTrue("Namespace: " + nsd + " was not created", freshNamespaceDesc != null); 295 namespaceMap.put(nsd.getName(), freshNamespaceDesc); 296 LOG.info("Created namespace:" + freshNamespaceDesc); 297 } catch (Exception e) { 298 LOG.warn("Caught exception in action: " + this.getClass()); 299 throw e; 300 } finally { 301 admin.close(); 302 } 303 } 304 305 private NamespaceDescriptor createNamespaceDesc() { 306 String namespaceName = 307 "itnamespace" + String.format("%010d", ThreadLocalRandom.current().nextInt()); 308 NamespaceDescriptor nsd = NamespaceDescriptor.create(namespaceName).build(); 309 310 nsd.setConfiguration(nsTestConfigKey, 311 String.format("%010d", ThreadLocalRandom.current().nextInt())); 312 return nsd; 313 } 314 } 315 316 private class ModifyNamespaceAction extends NamespaceAction { 317 @Override 318 void perform() throws IOException { 319 NamespaceDescriptor selected = selectNamespace(namespaceMap); 320 if (selected == null) { 321 return; 322 } 323 324 Admin admin = connection.getAdmin(); 325 try { 326 String namespaceName = selected.getName(); 327 LOG.info("Modifying namespace :" + selected); 328 NamespaceDescriptor modifiedNsd = NamespaceDescriptor.create(namespaceName).build(); 329 String nsValueNew; 330 do { 331 nsValueNew = String.format("%010d", ThreadLocalRandom.current().nextInt()); 332 } while (selected.getConfigurationValue(nsTestConfigKey).equals(nsValueNew)); 333 modifiedNsd.setConfiguration(nsTestConfigKey, nsValueNew); 334 admin.modifyNamespace(modifiedNsd); 335 NamespaceDescriptor freshNamespaceDesc = admin.getNamespaceDescriptor(namespaceName); 336 Assert.assertTrue("Namespace: " + selected + " was not modified", 337 freshNamespaceDesc.getConfigurationValue(nsTestConfigKey).equals(nsValueNew)); 338 Assert.assertTrue("Namespace: " + namespaceName + " does not exist", 339 admin.getNamespaceDescriptor(namespaceName) != null); 340 namespaceMap.put(namespaceName, freshNamespaceDesc); 341 LOG.info("Modified namespace :" + freshNamespaceDesc); 342 } catch (Exception e) { 343 LOG.warn("Caught exception in action: " + this.getClass()); 344 throw e; 345 } finally { 346 admin.close(); 347 } 348 } 349 } 350 351 private class DeleteNamespaceAction extends NamespaceAction { 352 @Override 353 void perform() throws IOException { 354 NamespaceDescriptor selected = selectNamespace(namespaceMap); 355 if (selected == null) { 356 return; 357 } 358 359 Admin admin = connection.getAdmin(); 360 try { 361 String namespaceName = selected.getName(); 362 LOG.info("Deleting namespace :" + selected); 363 admin.deleteNamespace(namespaceName); 364 try { 365 if (admin.getNamespaceDescriptor(namespaceName) != null) { 366 // the namespace still exists. 367 Assert.assertTrue("Namespace: " + selected + " was not deleted", false); 368 } else { 369 LOG.info("Deleted namespace :" + selected); 370 } 371 } catch (NamespaceNotFoundException nsnfe) { 372 // This is expected result 373 LOG.info("Deleted namespace :" + selected); 374 } 375 } catch (Exception e) { 376 LOG.warn("Caught exception in action: " + this.getClass()); 377 throw e; 378 } finally { 379 admin.close(); 380 } 381 } 382 } 383 384 private abstract class TableAction extends MasterAction { 385 // TableAction has implemented selectTable() shared by multiple table Actions 386 protected TableDescriptor selectTable(ConcurrentHashMap<TableName, TableDescriptor> tableMap) { 387 // synchronization to prevent removal from multiple threads 388 synchronized (tableMap) { 389 // randomly select table from tableMap 390 if (tableMap.isEmpty()) { 391 return null; 392 } 393 ArrayList<TableName> tableList = new ArrayList<>(tableMap.keySet()); 394 TableName key = tableList.get(ThreadLocalRandom.current().nextInt(tableList.size())); 395 TableDescriptor randomTd = tableMap.remove(key); 396 return randomTd; 397 } 398 } 399 } 400 401 private class CreateTableAction extends TableAction { 402 403 @Override 404 void perform() throws IOException { 405 Admin admin = connection.getAdmin(); 406 try { 407 TableDescriptor td = createTableDesc(); 408 TableName tableName = td.getTableName(); 409 if (admin.tableExists(tableName)) { 410 return; 411 } 412 String numRegionKey = String.format(NUM_REGIONS_KEY, this.getClass().getSimpleName()); 413 numRegions = getConf().getInt(numRegionKey, DEFAULT_NUM_REGIONS); 414 byte[] startKey = Bytes.toBytes("row-0000000000"); 415 byte[] endKey = Bytes.toBytes("row-" + Integer.MAX_VALUE); 416 LOG.info("Creating table:" + td); 417 admin.createTable(td, startKey, endKey, numRegions); 418 Assert.assertTrue("Table: " + td + " was not created", admin.tableExists(tableName)); 419 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 420 Assert.assertTrue("After create, Table: " + tableName + " in not enabled", 421 admin.isTableEnabled(tableName)); 422 enabledTables.put(tableName, freshTableDesc); 423 LOG.info("Created table:" + freshTableDesc); 424 } catch (Exception e) { 425 LOG.warn("Caught exception in action: " + this.getClass()); 426 throw e; 427 } finally { 428 admin.close(); 429 } 430 } 431 432 private TableDescriptor createTableDesc() { 433 String tableName = 434 String.format("ittable-%010d", ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)); 435 String familyName = "cf-" + ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE); 436 return TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)) 437 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(familyName)).build(); 438 } 439 } 440 441 private class DisableTableAction extends TableAction { 442 443 @Override 444 void perform() throws IOException { 445 446 TableDescriptor selected = selectTable(enabledTables); 447 if (selected == null) { 448 return; 449 } 450 451 Admin admin = connection.getAdmin(); 452 try { 453 TableName tableName = selected.getTableName(); 454 LOG.info("Disabling table :" + selected); 455 admin.disableTable(tableName); 456 Assert.assertTrue("Table: " + selected + " was not disabled", 457 admin.isTableDisabled(tableName)); 458 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 459 Assert.assertTrue("After disable, Table: " + tableName + " is not disabled", 460 admin.isTableDisabled(tableName)); 461 disabledTables.put(tableName, freshTableDesc); 462 LOG.info("Disabled table :" + freshTableDesc); 463 } catch (Exception e) { 464 LOG.warn("Caught exception in action: " + this.getClass()); 465 // TODO workaround 466 // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync 467 // operations 468 // 1) when enable/disable starts, the table state is changed to ENABLING/DISABLING (ZK node 469 // in 1.x), which will be further changed to ENABLED/DISABLED once the operation completes 470 // 2) if master failover happens in the middle of the enable/disable operation, the new 471 // master will try to recover the tables in ENABLING/DISABLING state, as programmed in 472 // AssignmentManager#recoverTableInEnablingState() and 473 // AssignmentManager#recoverTableInDisablingState() 474 // 3) after the new master initialization completes, the procedure tries to re-do the 475 // enable/disable operation, which was already done. Ignore those exceptions before change 476 // of behaviors of AssignmentManager in presence of PV2 477 if (e instanceof TableNotEnabledException) { 478 LOG.warn("Caught TableNotEnabledException in action: " + this.getClass()); 479 e.printStackTrace(); 480 } else { 481 throw e; 482 } 483 } finally { 484 admin.close(); 485 } 486 } 487 } 488 489 private class EnableTableAction extends TableAction { 490 491 @Override 492 void perform() throws IOException { 493 494 TableDescriptor selected = selectTable(disabledTables); 495 if (selected == null) { 496 return; 497 } 498 499 Admin admin = connection.getAdmin(); 500 try { 501 TableName tableName = selected.getTableName(); 502 LOG.info("Enabling table :" + selected); 503 admin.enableTable(tableName); 504 Assert.assertTrue("Table: " + selected + " was not enabled", 505 admin.isTableEnabled(tableName)); 506 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 507 Assert.assertTrue("After enable, Table: " + tableName + " in not enabled", 508 admin.isTableEnabled(tableName)); 509 enabledTables.put(tableName, freshTableDesc); 510 LOG.info("Enabled table :" + freshTableDesc); 511 } catch (Exception e) { 512 LOG.warn("Caught exception in action: " + this.getClass()); 513 // TODO workaround 514 // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync 515 // operations 1) when enable/disable starts, the table state is changed to 516 // ENABLING/DISABLING (ZK node in 1.x), which will be further changed to ENABLED/DISABLED 517 // once the operation completes 2) if master failover happens in the middle of the 518 // enable/disable operation, the new master will try to recover the tables in 519 // ENABLING/DISABLING state, as programmed in 520 // AssignmentManager#recoverTableInEnablingState() and 521 // AssignmentManager#recoverTableInDisablingState() 522 // 3) after the new master initialization completes, the procedure tries to re-do the 523 // enable/disable operation, which was already done. Ignore those exceptions before 524 // change of behaviors of AssignmentManager in presence of PV2 525 if (e instanceof TableNotDisabledException) { 526 LOG.warn("Caught TableNotDisabledException in action: " + this.getClass()); 527 e.printStackTrace(); 528 } else { 529 throw e; 530 } 531 } finally { 532 admin.close(); 533 } 534 } 535 } 536 537 private class DeleteTableAction extends TableAction { 538 539 @Override 540 void perform() throws IOException { 541 542 TableDescriptor selected = selectTable(disabledTables); 543 if (selected == null) { 544 return; 545 } 546 547 Admin admin = connection.getAdmin(); 548 try { 549 TableName tableName = selected.getTableName(); 550 LOG.info("Deleting table :" + selected); 551 admin.deleteTable(tableName); 552 Assert.assertFalse("Table: " + selected + " was not deleted", admin.tableExists(tableName)); 553 deletedTables.put(tableName, selected); 554 LOG.info("Deleted table :" + selected); 555 } catch (Exception e) { 556 LOG.warn("Caught exception in action: " + this.getClass()); 557 throw e; 558 } finally { 559 admin.close(); 560 } 561 } 562 } 563 564 private abstract class ColumnAction extends TableAction { 565 // ColumnAction has implemented selectFamily() shared by multiple family Actions 566 protected ColumnFamilyDescriptor selectFamily(TableDescriptor td) { 567 if (td == null) { 568 return null; 569 } 570 ColumnFamilyDescriptor[] families = td.getColumnFamilies(); 571 if (families.length == 0) { 572 LOG.info("No column families in table: " + td); 573 return null; 574 } 575 return families[ThreadLocalRandom.current().nextInt(families.length)]; 576 } 577 } 578 579 private class AddColumnFamilyAction extends ColumnAction { 580 581 @Override 582 void perform() throws IOException { 583 TableDescriptor selected = selectTable(disabledTables); 584 if (selected == null) { 585 return; 586 } 587 588 Admin admin = connection.getAdmin(); 589 try { 590 ColumnFamilyDescriptor cfd = createFamilyDesc(); 591 if (selected.hasColumnFamily(cfd.getName())) { 592 LOG.info( 593 Bytes.toString(cfd.getName()) + " already exists in table " + selected.getTableName()); 594 return; 595 } 596 TableName tableName = selected.getTableName(); 597 LOG.info("Adding column family: " + cfd + " to table: " + tableName); 598 admin.addColumnFamily(tableName, cfd); 599 // assertion 600 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 601 Assert.assertTrue("Column family: " + cfd + " was not added", 602 freshTableDesc.hasColumnFamily(cfd.getName())); 603 Assert.assertTrue("After add column family, Table: " + tableName + " is not disabled", 604 admin.isTableDisabled(tableName)); 605 disabledTables.put(tableName, freshTableDesc); 606 LOG.info("Added column family: " + cfd + " to table: " + tableName); 607 } catch (Exception e) { 608 LOG.warn("Caught exception in action: " + this.getClass()); 609 throw e; 610 } finally { 611 admin.close(); 612 } 613 } 614 615 private ColumnFamilyDescriptor createFamilyDesc() { 616 String familyName = String.format("cf-%010d", ThreadLocalRandom.current().nextInt()); 617 return ColumnFamilyDescriptorBuilder.of(familyName); 618 } 619 } 620 621 private class AlterFamilyVersionsAction extends ColumnAction { 622 623 @Override 624 void perform() throws IOException { 625 TableDescriptor selected = selectTable(disabledTables); 626 if (selected == null) { 627 return; 628 } 629 ColumnFamilyDescriptor columnDesc = selectFamily(selected); 630 if (columnDesc == null) { 631 return; 632 } 633 634 Admin admin = connection.getAdmin(); 635 int versions = ThreadLocalRandom.current().nextInt(10) + 3; 636 try { 637 TableName tableName = selected.getTableName(); 638 LOG.info("Altering versions of column family: " + columnDesc + " to: " + versions 639 + " in table: " + tableName); 640 641 ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(columnDesc) 642 .setMinVersions(versions).setMaxVersions(versions).build(); 643 TableDescriptor td = 644 TableDescriptorBuilder.newBuilder(selected).modifyColumnFamily(cfd).build(); 645 admin.modifyTable(td); 646 647 // assertion 648 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 649 ColumnFamilyDescriptor freshColumnDesc = 650 freshTableDesc.getColumnFamily(columnDesc.getName()); 651 Assert.assertEquals("Column family: " + columnDesc + " was not altered", 652 freshColumnDesc.getMaxVersions(), versions); 653 Assert.assertEquals("Column family: " + freshColumnDesc + " was not altered", 654 freshColumnDesc.getMinVersions(), versions); 655 Assert.assertTrue( 656 "After alter versions of column family, Table: " + tableName + " is not disabled", 657 admin.isTableDisabled(tableName)); 658 disabledTables.put(tableName, freshTableDesc); 659 LOG.info("Altered versions of column family: " + columnDesc + " to: " + versions 660 + " in table: " + tableName); 661 } catch (Exception e) { 662 LOG.warn("Caught exception in action: " + this.getClass()); 663 throw e; 664 } finally { 665 admin.close(); 666 } 667 } 668 } 669 670 private class AlterFamilyEncodingAction extends ColumnAction { 671 672 @Override 673 void perform() throws IOException { 674 TableDescriptor selected = selectTable(disabledTables); 675 if (selected == null) { 676 return; 677 } 678 ColumnFamilyDescriptor columnDesc = selectFamily(selected); 679 if (columnDesc == null) { 680 return; 681 } 682 683 Admin admin = connection.getAdmin(); 684 try { 685 TableName tableName = selected.getTableName(); 686 // possible DataBlockEncoding ids 687 DataBlockEncoding[] possibleIds = { DataBlockEncoding.NONE, DataBlockEncoding.PREFIX, 688 DataBlockEncoding.DIFF, DataBlockEncoding.FAST_DIFF, DataBlockEncoding.ROW_INDEX_V1 }; 689 short id = possibleIds[ThreadLocalRandom.current().nextInt(possibleIds.length)].getId(); 690 LOG.info("Altering encoding of column family: " + columnDesc + " to: " + id + " in table: " 691 + tableName); 692 693 ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(columnDesc) 694 .setDataBlockEncoding(DataBlockEncoding.getEncodingById(id)).build(); 695 TableDescriptor td = 696 TableDescriptorBuilder.newBuilder(selected).modifyColumnFamily(cfd).build(); 697 admin.modifyTable(td); 698 699 // assertion 700 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 701 ColumnFamilyDescriptor freshColumnDesc = 702 freshTableDesc.getColumnFamily(columnDesc.getName()); 703 Assert.assertEquals("Encoding of column family: " + columnDesc + " was not altered", 704 freshColumnDesc.getDataBlockEncoding().getId(), id); 705 Assert.assertTrue( 706 "After alter encoding of column family, Table: " + tableName + " is not disabled", 707 admin.isTableDisabled(tableName)); 708 disabledTables.put(tableName, freshTableDesc); 709 LOG.info("Altered encoding of column family: " + freshColumnDesc + " to: " + id 710 + " in table: " + tableName); 711 } catch (Exception e) { 712 LOG.warn("Caught exception in action: " + this.getClass()); 713 throw e; 714 } finally { 715 admin.close(); 716 } 717 } 718 } 719 720 private class DeleteColumnFamilyAction extends ColumnAction { 721 722 @Override 723 void perform() throws IOException { 724 TableDescriptor selected = selectTable(disabledTables); 725 ColumnFamilyDescriptor cfd = selectFamily(selected); 726 if (selected == null || cfd == null) { 727 return; 728 } 729 730 Admin admin = connection.getAdmin(); 731 try { 732 if (selected.getColumnFamilyCount() < 2) { 733 LOG.info("No enough column families to delete in table " + selected.getTableName()); 734 return; 735 } 736 TableName tableName = selected.getTableName(); 737 LOG.info("Deleting column family: " + cfd + " from table: " + tableName); 738 admin.deleteColumnFamily(tableName, cfd.getName()); 739 // assertion 740 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 741 Assert.assertFalse("Column family: " + cfd + " was not added", 742 freshTableDesc.hasColumnFamily(cfd.getName())); 743 Assert.assertTrue("After delete column family, Table: " + tableName + " is not disabled", 744 admin.isTableDisabled(tableName)); 745 disabledTables.put(tableName, freshTableDesc); 746 LOG.info("Deleted column family: " + cfd + " from table: " + tableName); 747 } catch (Exception e) { 748 LOG.warn("Caught exception in action: " + this.getClass()); 749 throw e; 750 } finally { 751 admin.close(); 752 } 753 } 754 } 755 756 private class AddRowAction extends ColumnAction { 757 // populate tables 758 @Override 759 void perform() throws IOException { 760 TableDescriptor selected = selectTable(enabledTables); 761 if (selected == null) { 762 return; 763 } 764 765 Admin admin = connection.getAdmin(); 766 TableName tableName = selected.getTableName(); 767 try (Table table = connection.getTable(tableName)) { 768 ArrayList<RegionInfo> regionInfos = 769 new ArrayList<>(admin.getRegions(selected.getTableName())); 770 int numRegions = regionInfos.size(); 771 // average number of rows to be added per action to each region 772 int average_rows = 1; 773 int numRows = average_rows * numRegions; 774 LOG.info("Adding " + numRows + " rows to table: " + selected); 775 byte[] value = new byte[10]; 776 for (int i = 0; i < numRows; i++) { 777 // nextInt(Integer.MAX_VALUE)) to return positive numbers only 778 byte[] rowKey = 779 Bytes.toBytes("row-" + String.format("%010d", ThreadLocalRandom.current().nextInt())); 780 ColumnFamilyDescriptor cfd = selectFamily(selected); 781 if (cfd == null) { 782 return; 783 } 784 byte[] family = cfd.getName(); 785 byte[] qualifier = Bytes.toBytes("col-" + ThreadLocalRandom.current().nextInt(10)); 786 Bytes.random(value); 787 Put put = new Put(rowKey); 788 put.addColumn(family, qualifier, value); 789 table.put(put); 790 } 791 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 792 Assert.assertTrue("After insert, Table: " + tableName + " in not enabled", 793 admin.isTableEnabled(tableName)); 794 enabledTables.put(tableName, freshTableDesc); 795 LOG.info("Added " + numRows + " rows to table: " + selected); 796 } catch (Exception e) { 797 LOG.warn("Caught exception in action: " + this.getClass()); 798 throw e; 799 } finally { 800 admin.close(); 801 } 802 } 803 } 804 805 private enum ACTION { 806 CREATE_NAMESPACE, 807 MODIFY_NAMESPACE, 808 DELETE_NAMESPACE, 809 CREATE_TABLE, 810 DISABLE_TABLE, 811 ENABLE_TABLE, 812 DELETE_TABLE, 813 ADD_COLUMNFAMILY, 814 DELETE_COLUMNFAMILY, 815 ALTER_FAMILYVERSIONS, 816 ALTER_FAMILYENCODING, 817 ADD_ROW 818 } 819 820 private class Worker extends Thread { 821 822 private Exception savedException; 823 824 private ACTION action; 825 826 @Override 827 public void run() { 828 while (running.get()) { 829 // select random action 830 ACTION selectedAction = 831 ACTION.values()[ThreadLocalRandom.current().nextInt(ACTION.values().length)]; 832 this.action = selectedAction; 833 LOG.info("Performing Action: " + selectedAction); 834 835 try { 836 switch (selectedAction) { 837 case CREATE_NAMESPACE: 838 new CreateNamespaceAction().perform(); 839 break; 840 case MODIFY_NAMESPACE: 841 new ModifyNamespaceAction().perform(); 842 break; 843 case DELETE_NAMESPACE: 844 new DeleteNamespaceAction().perform(); 845 break; 846 case CREATE_TABLE: 847 // stop creating new tables in the later stage of the test to avoid too many empty 848 // tables 849 if (create_table.get()) { 850 new CreateTableAction().perform(); 851 } 852 break; 853 case ADD_ROW: 854 new AddRowAction().perform(); 855 break; 856 case DISABLE_TABLE: 857 new DisableTableAction().perform(); 858 break; 859 case ENABLE_TABLE: 860 new EnableTableAction().perform(); 861 break; 862 case DELETE_TABLE: 863 // reduce probability of deleting table to 20% 864 if (ThreadLocalRandom.current().nextInt(100) < 20) { 865 new DeleteTableAction().perform(); 866 } 867 break; 868 case ADD_COLUMNFAMILY: 869 new AddColumnFamilyAction().perform(); 870 break; 871 case DELETE_COLUMNFAMILY: 872 // reduce probability of deleting column family to 20% 873 if (ThreadLocalRandom.current().nextInt(100) < 20) { 874 new DeleteColumnFamilyAction().perform(); 875 } 876 break; 877 case ALTER_FAMILYVERSIONS: 878 new AlterFamilyVersionsAction().perform(); 879 break; 880 case ALTER_FAMILYENCODING: 881 new AlterFamilyEncodingAction().perform(); 882 break; 883 } 884 } catch (Exception ex) { 885 this.savedException = ex; 886 return; 887 } 888 } 889 LOG.info(this.getName() + " stopped"); 890 } 891 892 public Exception getSavedException() { 893 return this.savedException; 894 } 895 896 public ACTION getAction() { 897 return this.action; 898 } 899 } 900 901 private void checkException(List<Worker> workers) { 902 if (workers == null || workers.isEmpty()) return; 903 for (Worker worker : workers) { 904 Exception e = worker.getSavedException(); 905 if (e != null) { 906 LOG.error("Found exception in thread: " + worker.getName()); 907 e.printStackTrace(); 908 } 909 Assert.assertNull("Action failed: " + worker.getAction() + " in thread: " + worker.getName(), 910 e); 911 } 912 } 913 914 private int runTest() throws Exception { 915 LOG.info("Starting the test"); 916 917 String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName()); 918 long runtime = util.getConfiguration().getLong(runtimeKey, DEFAULT_RUN_TIME); 919 920 String numThreadKey = String.format(NUM_THREADS_KEY, this.getClass().getSimpleName()); 921 numThreads = util.getConfiguration().getInt(numThreadKey, DEFAULT_NUM_THREADS); 922 923 ArrayList<Worker> workers = new ArrayList<>(numThreads); 924 for (int i = 0; i < numThreads; i++) { 925 checkException(workers); 926 Worker worker = new Worker(); 927 LOG.info("Launching worker thread " + worker.getName()); 928 workers.add(worker); 929 worker.start(); 930 } 931 932 Threads.sleep(runtime / 2); 933 LOG.info("Stopping creating new tables"); 934 create_table.set(false); 935 Threads.sleep(runtime / 2); 936 LOG.info("Runtime is up"); 937 running.set(false); 938 939 checkException(workers); 940 941 for (Worker worker : workers) { 942 worker.join(); 943 } 944 LOG.info("All Worker threads stopped"); 945 946 // verify 947 LOG.info("Verify actions of all threads succeeded"); 948 checkException(workers); 949 LOG.info("Verify namespaces"); 950 verifyNamespaces(); 951 LOG.info("Verify states of all tables"); 952 verifyTables(); 953 954 // RUN HBCK 955 956 HBaseFsck hbck = null; 957 try { 958 LOG.info("Running hbck"); 959 hbck = HbckTestingUtil.doFsck(util.getConfiguration(), false); 960 if (HbckTestingUtil.inconsistencyFound(hbck)) { 961 // Find the inconsistency during HBCK. Leave table and namespace undropped so that 962 // we can check outside the test. 963 keepObjectsAtTheEnd = true; 964 } 965 HbckTestingUtil.assertNoErrors(hbck); 966 LOG.info("Finished hbck"); 967 } finally { 968 if (hbck != null) { 969 hbck.close(); 970 } 971 } 972 return 0; 973 } 974 975 @Override 976 public TableName getTablename() { 977 return null; // This test is not inteded to run with stock Chaos Monkey 978 } 979 980 @Override 981 protected Set<String> getColumnFamilies() { 982 return null; // This test is not inteded to run with stock Chaos Monkey 983 } 984 985 public static void main(String[] args) throws Exception { 986 Configuration conf = HBaseConfiguration.create(); 987 IntegrationTestingUtility.setUseDistributedCluster(conf); 988 IntegrationTestDDLMasterFailover masterFailover = new IntegrationTestDDLMasterFailover(); 989 Connection connection = null; 990 int ret = 1; 991 try { 992 // Initialize connection once, then pass to Actions 993 LOG.debug("Setting up connection ..."); 994 connection = ConnectionFactory.createConnection(conf); 995 masterFailover.setConnection(connection); 996 ret = ToolRunner.run(conf, masterFailover, args); 997 } catch (IOException e) { 998 LOG.error(HBaseMarkers.FATAL, "Failed to establish connection. Aborting test ...", e); 999 } finally { 1000 connection = masterFailover.getConnection(); 1001 if (connection != null) { 1002 connection.close(); 1003 } 1004 System.exit(ret); 1005 } 1006 } 1007}