001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.hamcrest.CoreMatchers.equalTo; 021import static org.hamcrest.CoreMatchers.notNullValue; 022import static org.hamcrest.CoreMatchers.nullValue; 023import static org.hamcrest.MatcherAssert.assertThat; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027import static org.mockito.ArgumentMatchers.any; 028import static org.mockito.Mockito.doAnswer; 029import static org.mockito.Mockito.mock; 030import static org.mockito.Mockito.when; 031 032import java.io.ByteArrayOutputStream; 033import java.io.File; 034import java.io.PrintStream; 035import java.util.ArrayList; 036import java.util.concurrent.ThreadLocalRandom; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellUtil; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseTestingUtil; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.KeyValue; 046import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.Delete; 049import org.apache.hadoop.hbase.client.Get; 050import org.apache.hadoop.hbase.client.Put; 051import org.apache.hadoop.hbase.client.Result; 052import org.apache.hadoop.hbase.client.Table; 053import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 054import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper; 055import org.apache.hadoop.hbase.regionserver.TestRecoveredEdits; 056import org.apache.hadoop.hbase.testclassification.LargeTests; 057import org.apache.hadoop.hbase.testclassification.MapReduceTests; 058import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.CommonFSUtils; 061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 062import org.apache.hadoop.hbase.util.LauncherSecurityManager; 063import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 064import org.apache.hadoop.hbase.wal.WAL; 065import org.apache.hadoop.hbase.wal.WALEdit; 066import org.apache.hadoop.hbase.wal.WALKey; 067import org.apache.hadoop.mapreduce.Mapper; 068import org.apache.hadoop.mapreduce.Mapper.Context; 069import org.apache.hadoop.util.ToolRunner; 070import org.junit.AfterClass; 071import org.junit.BeforeClass; 072import org.junit.ClassRule; 073import org.junit.Rule; 074import org.junit.Test; 075import org.junit.experimental.categories.Category; 076import org.junit.rules.TestName; 077import org.mockito.invocation.InvocationOnMock; 078import org.mockito.stubbing.Answer; 079 080/** 081 * Basic test for the WALPlayer M/R tool 082 */ 083@Category({ MapReduceTests.class, LargeTests.class }) 084public class TestWALPlayer { 085 @ClassRule 086 public static final HBaseClassTestRule CLASS_RULE = 087 HBaseClassTestRule.forClass(TestWALPlayer.class); 088 089 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 090 private static SingleProcessHBaseCluster cluster; 091 private static Path rootDir; 092 private static Path walRootDir; 093 private static FileSystem fs; 094 private static FileSystem logFs; 095 private static Configuration conf; 096 097 @Rule 098 public TestName name = new TestName(); 099 100 @BeforeClass 101 public static void beforeClass() throws Exception { 102 conf = TEST_UTIL.getConfiguration(); 103 rootDir = TEST_UTIL.createRootDir(); 104 walRootDir = TEST_UTIL.createWALRootDir(); 105 fs = CommonFSUtils.getRootDirFileSystem(conf); 106 logFs = CommonFSUtils.getWALFileSystem(conf); 107 cluster = TEST_UTIL.startMiniCluster(); 108 } 109 110 @AfterClass 111 public static void afterClass() throws Exception { 112 TEST_UTIL.shutdownMiniCluster(); 113 fs.delete(rootDir, true); 114 logFs.delete(walRootDir, true); 115 } 116 117 /** 118 * Test that WALPlayer can replay recovered.edits files. 119 */ 120 @Test 121 public void testPlayingRecoveredEdit() throws Exception { 122 TableName tn = TableName.valueOf(TestRecoveredEdits.RECOVEREDEDITS_TABLENAME); 123 TEST_UTIL.createTable(tn, TestRecoveredEdits.RECOVEREDEDITS_COLUMNFAMILY); 124 // Copy testing recovered.edits file that is over under hbase-server test resources 125 // up into a dir in our little hdfs cluster here. 126 String hbaseServerTestResourcesEdits = 127 System.getProperty("test.build.classes") + "/../../../hbase-server/src/test/resources/" 128 + TestRecoveredEdits.RECOVEREDEDITS_PATH.getName(); 129 assertTrue(new File(hbaseServerTestResourcesEdits).exists()); 130 FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); 131 // Target dir. 132 Path targetDir = new Path("edits").makeQualified(dfs.getUri(), dfs.getHomeDirectory()); 133 assertTrue(dfs.mkdirs(targetDir)); 134 dfs.copyFromLocalFile(new Path(hbaseServerTestResourcesEdits), targetDir); 135 assertEquals(0, 136 ToolRunner.run(new WALPlayer(this.conf), new String[] { targetDir.toString() })); 137 // I don't know how many edits are in this file for this table... so just check more than 1. 138 assertTrue(TEST_UTIL.countRows(tn) > 0); 139 } 140 141 /** 142 * Tests that when you write multiple cells with the same timestamp they are properly sorted by 143 * their sequenceId in WALPlayer/CellSortReducer so that the correct one wins when querying from 144 * the resulting bulkloaded HFiles. See HBASE-27649 145 */ 146 @Test 147 public void testWALPlayerBulkLoadWithOverriddenTimestamps() throws Exception { 148 final TableName tableName = TableName.valueOf(name.getMethodName() + "1"); 149 final byte[] family = Bytes.toBytes("family"); 150 final byte[] column1 = Bytes.toBytes("c1"); 151 final byte[] column2 = Bytes.toBytes("c2"); 152 final byte[] row = Bytes.toBytes("row"); 153 Table table = TEST_UTIL.createTable(tableName, family); 154 155 long now = EnvironmentEdgeManager.currentTime(); 156 // put a row into the first table 157 Put p = new Put(row); 158 p.addColumn(family, column1, now, column1); 159 p.addColumn(family, column2, now, column2); 160 161 table.put(p); 162 163 byte[] lastVal = null; 164 165 for (int i = 0; i < 50; i++) { 166 lastVal = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); 167 p = new Put(row); 168 p.addColumn(family, column1, now, lastVal); 169 170 table.put(p); 171 172 // wal rolling is necessary to trigger the bug. otherwise no sorting 173 // needs to occur in the reducer because it's all sorted and coming from a single file. 174 if (i % 10 == 0) { 175 WAL log = cluster.getRegionServer(0).getWAL(null); 176 log.rollWriter(); 177 } 178 } 179 180 WAL log = cluster.getRegionServer(0).getWAL(null); 181 log.rollWriter(); 182 String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(), 183 HConstants.HREGION_LOGDIR_NAME).toString(); 184 185 Configuration configuration = new Configuration(TEST_UTIL.getConfiguration()); 186 String outPath = "/tmp/" + name.getMethodName(); 187 configuration.set(WALPlayer.BULK_OUTPUT_CONF_KEY, outPath); 188 configuration.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); 189 190 WALPlayer player = new WALPlayer(configuration); 191 assertEquals(0, ToolRunner.run(configuration, player, 192 new String[] { walInputDir, tableName.getNameAsString() })); 193 194 Get g = new Get(row); 195 Result result = table.get(g); 196 byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1)); 197 assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(lastVal))); 198 199 table = TEST_UTIL.truncateTable(tableName); 200 g = new Get(row); 201 result = table.get(g); 202 assertThat(result.listCells(), nullValue()); 203 204 BulkLoadHFiles.create(configuration).bulkLoad(tableName, 205 new Path(outPath, tableName.getNamespaceAsString() + "/" + tableName.getNameAsString())); 206 207 g = new Get(row); 208 result = table.get(g); 209 value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1)); 210 211 assertThat(result.listCells(), notNullValue()); 212 assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(lastVal))); 213 } 214 215 /** 216 * Simple end-to-end test 217 */ 218 @Test 219 public void testWALPlayer() throws Exception { 220 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); 221 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); 222 final byte[] FAMILY = Bytes.toBytes("family"); 223 final byte[] COLUMN1 = Bytes.toBytes("c1"); 224 final byte[] COLUMN2 = Bytes.toBytes("c2"); 225 final byte[] ROW = Bytes.toBytes("row"); 226 Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); 227 Table t2 = TEST_UTIL.createTable(tableName2, FAMILY); 228 229 // put a row into the first table 230 Put p = new Put(ROW); 231 p.addColumn(FAMILY, COLUMN1, COLUMN1); 232 p.addColumn(FAMILY, COLUMN2, COLUMN2); 233 t1.put(p); 234 // delete one column 235 Delete d = new Delete(ROW); 236 d.addColumns(FAMILY, COLUMN1); 237 t1.delete(d); 238 239 // replay the WAL, map table 1 to table 2 240 WAL log = cluster.getRegionServer(0).getWAL(null); 241 log.rollWriter(); 242 String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(), 243 HConstants.HREGION_LOGDIR_NAME).toString(); 244 245 Configuration configuration = TEST_UTIL.getConfiguration(); 246 WALPlayer player = new WALPlayer(configuration); 247 String optionName = "_test_.name"; 248 configuration.set(optionName, "1000"); 249 player.setupTime(configuration, optionName); 250 assertEquals(1000, configuration.getLong(optionName, 0)); 251 assertEquals(0, ToolRunner.run(configuration, player, 252 new String[] { walInputDir, tableName1.getNameAsString(), tableName2.getNameAsString() })); 253 254 // verify the WAL was player into table 2 255 Get g = new Get(ROW); 256 Result r = t2.get(g); 257 assertEquals(1, r.size()); 258 assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2)); 259 } 260 261 /** 262 * Test WALKeyValueMapper setup and map 263 */ 264 @Test 265 public void testWALKeyValueMapper() throws Exception { 266 testWALKeyValueMapper(WALPlayer.TABLES_KEY); 267 } 268 269 @Test 270 public void testWALKeyValueMapperWithDeprecatedConfig() throws Exception { 271 testWALKeyValueMapper("hlog.input.tables"); 272 } 273 274 private void testWALKeyValueMapper(final String tableConfigKey) throws Exception { 275 Configuration configuration = new Configuration(); 276 configuration.set(tableConfigKey, "table"); 277 WALKeyValueMapper mapper = new WALKeyValueMapper(); 278 WALKey key = mock(WALKey.class); 279 when(key.getTableName()).thenReturn(TableName.valueOf("table")); 280 @SuppressWarnings("unchecked") 281 Mapper<WALKey, WALEdit, ImmutableBytesWritable, Cell>.Context context = mock(Context.class); 282 when(context.getConfiguration()).thenReturn(configuration); 283 284 WALEdit value = mock(WALEdit.class); 285 ArrayList<Cell> values = new ArrayList<>(); 286 KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), null); 287 288 values.add(kv1); 289 when(value.getCells()).thenReturn(values); 290 mapper.setup(context); 291 292 doAnswer(new Answer<Void>() { 293 294 @Override 295 public Void answer(InvocationOnMock invocation) throws Throwable { 296 ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArgument(0); 297 MapReduceExtendedCell key = (MapReduceExtendedCell) invocation.getArgument(1); 298 assertEquals("row", Bytes.toString(writer.get())); 299 assertEquals("row", Bytes.toString(CellUtil.cloneRow(key))); 300 return null; 301 } 302 }).when(context).write(any(), any()); 303 304 mapper.map(key, value, context); 305 306 } 307 308 /** 309 * Test main method 310 */ 311 @Test 312 public void testMainMethod() throws Exception { 313 314 PrintStream oldPrintStream = System.err; 315 SecurityManager SECURITY_MANAGER = System.getSecurityManager(); 316 LauncherSecurityManager newSecurityManager = new LauncherSecurityManager(); 317 System.setSecurityManager(newSecurityManager); 318 ByteArrayOutputStream data = new ByteArrayOutputStream(); 319 String[] args = {}; 320 System.setErr(new PrintStream(data)); 321 try { 322 System.setErr(new PrintStream(data)); 323 try { 324 WALPlayer.main(args); 325 fail("should be SecurityException"); 326 } catch (SecurityException e) { 327 assertEquals(-1, newSecurityManager.getExitCode()); 328 assertTrue(data.toString().contains("ERROR: Wrong number of arguments:")); 329 assertTrue(data.toString() 330 .contains("Usage: WALPlayer [options] <WAL inputdir>" + " [<tables> <tableMappings>]")); 331 assertTrue(data.toString().contains("-Dwal.bulk.output=/path/for/output")); 332 } 333 334 } finally { 335 System.setErr(oldPrintStream); 336 System.setSecurityManager(SECURITY_MANAGER); 337 } 338 339 } 340 341}