001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.List; 026import java.util.NavigableSet; 027import java.util.TreeSet; 028import java.util.UUID; 029import java.util.regex.Matcher; 030import java.util.regex.Pattern; 031import org.apache.commons.lang3.ArrayUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileAlreadyExistsException; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.fs.PathFilter; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellScanner; 040import org.apache.hadoop.hbase.CellUtil; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Durability; 045import org.apache.hadoop.hbase.client.Mutation; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.regionserver.HRegion; 049import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.CommonFSUtils; 052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 053import org.apache.hadoop.hbase.util.FSUtils; 054import org.apache.hadoop.hbase.util.IOExceptionSupplier; 055import org.apache.hadoop.hbase.util.Pair; 056import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 057import org.apache.yetus.audience.InterfaceAudience; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 064 065/** 066 * This class provides static methods to support WAL splitting related works 067 */ 068@InterfaceAudience.Private 069public final class WALSplitUtil { 070 private static final Logger LOG = LoggerFactory.getLogger(WALSplitUtil.class); 071 072 private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); 073 private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; 074 private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid"; 075 private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid"; 076 private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length(); 077 078 private WALSplitUtil() { 079 } 080 081 /** 082 * Completes the work done by splitLogFile by archiving logs 083 * <p> 084 * It is invoked by SplitLogManager once it knows that one of the SplitLogWorkers have completed 085 * the splitLogFile() part. If the master crashes then this function might get called multiple 086 * times. 087 * <p> 088 */ 089 public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException { 090 Path walDir = CommonFSUtils.getWALRootDir(conf); 091 Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME); 092 Path walPath; 093 if (CommonFSUtils.isStartingWithPath(walDir, logfile)) { 094 walPath = new Path(logfile); 095 } else { 096 walPath = new Path(walDir, logfile); 097 } 098 FileSystem walFS = walDir.getFileSystem(conf); 099 boolean corrupt = ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS); 100 archive(walPath, corrupt, oldLogDir, walFS, conf); 101 Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, walPath.getName()); 102 walFS.delete(stagingDir, true); 103 } 104 105 /** 106 * Moves processed logs to a oldLogDir after successful processing Moves corrupted logs (any log 107 * that couldn't be successfully parsed to corruptDir (.corrupt) for later investigation 108 */ 109 static void archive(final Path wal, final boolean corrupt, final Path oldWALDir, 110 final FileSystem walFS, final Configuration conf) throws IOException { 111 Path dir; 112 Path target; 113 if (corrupt) { 114 dir = new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); 115 if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { 116 LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", dir); 117 } 118 target = new Path(dir, wal.getName()); 119 } else { 120 dir = oldWALDir; 121 target = AbstractFSWAL.getWALArchivePath(oldWALDir, wal); 122 } 123 mkdir(walFS, dir); 124 moveWAL(walFS, wal, target); 125 } 126 127 private static void mkdir(FileSystem fs, Path dir) throws IOException { 128 if (!fs.mkdirs(dir)) { 129 LOG.warn("Failed mkdir {}", dir); 130 } 131 } 132 133 /** 134 * Move WAL. Used to move processed WALs to archive or bad WALs to corrupt WAL dir. WAL may have 135 * already been moved; makes allowance. 136 */ 137 public static void moveWAL(FileSystem fs, Path p, Path targetDir) throws IOException { 138 if (fs.exists(p)) { 139 if (!CommonFSUtils.renameAndSetModifyTime(fs, p, targetDir)) { 140 LOG.warn("Failed move of {} to {}", p, targetDir); 141 } else { 142 LOG.info("Moved {} to {}", p, targetDir); 143 } 144 } 145 } 146 147 /** 148 * Path to a file under RECOVERED_EDITS_DIR directory of the region found in <code>logEntry</code> 149 * named for the sequenceid in the passed <code>logEntry</code>: e.g. 150 * /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures existence of 151 * RECOVERED_EDITS_DIR under the region creating it if necessary. And also set storage policy for 152 * RECOVERED_EDITS_DIR if WAL_STORAGE_POLICY is configured. 153 * @param tableName the table name 154 * @param encodedRegionName the encoded region name 155 * @param seqId the sequence id which used to generate file name 156 * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name. 157 * @param tmpDirName of the directory used to sideline old recovered edits file 158 * @param conf configuration 159 * @return Path to file into which to dump split log edits. 160 */ 161 @SuppressWarnings("deprecation") 162 static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long seqId, 163 String fileNameBeingSplit, String tmpDirName, Configuration conf) throws IOException { 164 FileSystem walFS = CommonFSUtils.getWALFileSystem(conf); 165 Path tableDir = CommonFSUtils.getWALTableDir(conf, tableName); 166 String encodedRegionNameStr = Bytes.toString(encodedRegionName); 167 Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionNameStr); 168 Path dir = getRegionDirRecoveredEditsDir(regionDir); 169 170 if (walFS.exists(dir) && walFS.isFile(dir)) { 171 Path tmp = new Path(tmpDirName); 172 if (!walFS.exists(tmp)) { 173 walFS.mkdirs(tmp); 174 } 175 tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionNameStr); 176 LOG.warn("Found existing old file: {}. It could be some " 177 + "leftover of an old installation. It should be a folder instead. " + "So moving it to {}", 178 dir, tmp); 179 if (!walFS.rename(dir, tmp)) { 180 LOG.warn("Failed to sideline old file {}", dir); 181 } 182 } 183 184 if (!walFS.exists(dir) && !walFS.mkdirs(dir)) { 185 LOG.warn("mkdir failed on {}", dir); 186 } else { 187 String storagePolicy = 188 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 189 CommonFSUtils.setStoragePolicy(walFS, dir, storagePolicy); 190 } 191 // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. 192 // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure 193 // region's replayRecoveredEdits will not delete it 194 String fileName = formatRecoveredEditsFileName(seqId); 195 fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit); 196 return new Path(dir, fileName); 197 } 198 199 private static String getTmpRecoveredEditsFileName(String fileName) { 200 return fileName + RECOVERED_LOG_TMPFILE_SUFFIX; 201 } 202 203 /** 204 * Get the completed recovered edits file path, renaming it to be by last edit in the file from 205 * its first edit. Then we could use the name to skip recovered edits when doing 206 * HRegion#replayRecoveredEditsIfAny(Map, CancelableProgressable, MonitoredTask). 207 * @return dstPath take file's last edit log seq num as the name 208 */ 209 static Path getCompletedRecoveredEditsFilePath(Path srcPath, long maximumEditWALSeqNum) { 210 String fileName = formatRecoveredEditsFileName(maximumEditWALSeqNum); 211 return new Path(srcPath.getParent(), fileName); 212 } 213 214 static String formatRecoveredEditsFileName(final long seqid) { 215 return String.format("%019d", seqid); 216 } 217 218 /** 219 * @param regionDir This regions directory in the filesystem. 220 * @return The directory that holds recovered edits files for the region <code>regionDir</code> 221 */ 222 public static Path getRegionDirRecoveredEditsDir(final Path regionDir) { 223 return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR); 224 } 225 226 /** 227 * Check whether there is recovered.edits in the region dir 228 * @param conf conf 229 * @param regionInfo the region to check 230 * @return true if recovered.edits exist in the region dir 231 */ 232 public static boolean hasRecoveredEdits(final Configuration conf, final RegionInfo regionInfo) 233 throws IOException { 234 // No recovered.edits for non default replica regions 235 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 236 return false; 237 } 238 // Only default replica region can reach here, so we can use regioninfo 239 // directly without converting it to default replica's regioninfo. 240 Path regionWALDir = 241 CommonFSUtils.getWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName()); 242 Path regionDir = FSUtils.getRegionDirFromRootDir(CommonFSUtils.getRootDir(conf), regionInfo); 243 Path wrongRegionWALDir = 244 CommonFSUtils.getWrongWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName()); 245 FileSystem walFs = CommonFSUtils.getWALFileSystem(conf); 246 FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf); 247 NavigableSet<Path> files = getSplitEditFilesSorted(walFs, regionWALDir); 248 if (!files.isEmpty()) { 249 return true; 250 } 251 files = getSplitEditFilesSorted(rootFs, regionDir); 252 if (!files.isEmpty()) { 253 return true; 254 } 255 files = getSplitEditFilesSorted(walFs, wrongRegionWALDir); 256 return !files.isEmpty(); 257 } 258 259 /** 260 * This method will check 3 places for finding the max sequence id file. One is the expected 261 * place, another is the old place under the region directory, and the last one is the wrong one 262 * we introduced in HBASE-20734. See HBASE-22617 for more details. 263 * <p/> 264 * Notice that, you should always call this method instead of 265 * {@link #getMaxRegionSequenceId(FileSystem, Path)} until 4.0.0 release. 266 * @deprecated Only for compatibility, will be removed in 4.0.0. 267 */ 268 @Deprecated 269 public static long getMaxRegionSequenceId(Configuration conf, RegionInfo region, 270 IOExceptionSupplier<FileSystem> rootFsSupplier, IOExceptionSupplier<FileSystem> walFsSupplier) 271 throws IOException { 272 FileSystem rootFs = rootFsSupplier.get(); 273 FileSystem walFs = walFsSupplier.get(); 274 Path regionWALDir = 275 CommonFSUtils.getWALRegionDir(conf, region.getTable(), region.getEncodedName()); 276 // This is the old place where we store max sequence id file 277 Path regionDir = FSUtils.getRegionDirFromRootDir(CommonFSUtils.getRootDir(conf), region); 278 // This is for HBASE-20734, where we use a wrong directory, see HBASE-22617 for more details. 279 Path wrongRegionWALDir = 280 CommonFSUtils.getWrongWALRegionDir(conf, region.getTable(), region.getEncodedName()); 281 long maxSeqId = getMaxRegionSequenceId(walFs, regionWALDir); 282 maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(rootFs, regionDir)); 283 maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(walFs, wrongRegionWALDir)); 284 return maxSeqId; 285 } 286 287 /** 288 * Returns sorted set of edit files made by splitter, excluding files with '.temp' suffix. 289 * @param walFS WAL FileSystem used to retrieving split edits files. 290 * @param regionDir WAL region dir to look for recovered edits files under. 291 * @return Files in passed <code>regionDir</code> as a sorted set. 292 */ 293 public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS, 294 final Path regionDir) throws IOException { 295 NavigableSet<Path> filesSorted = new TreeSet<>(); 296 Path editsdir = getRegionDirRecoveredEditsDir(regionDir); 297 if (!walFS.exists(editsdir)) { 298 return filesSorted; 299 } 300 FileStatus[] files = CommonFSUtils.listStatus(walFS, editsdir, new PathFilter() { 301 @Override 302 public boolean accept(Path p) { 303 boolean result = false; 304 try { 305 // Return files and only files that match the editfile names pattern. 306 // There can be other files in this directory other than edit files. 307 // In particular, on error, we'll move aside the bad edit file giving 308 // it a timestamp suffix. See moveAsideBadEditsFile. 309 Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName()); 310 result = walFS.isFile(p) && m.matches(); 311 // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX, 312 // because it means splitwal thread is writting this file. 313 if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { 314 result = false; 315 } 316 // Skip SeqId Files 317 if (isSequenceIdFile(p)) { 318 result = false; 319 } 320 } catch (IOException e) { 321 LOG.warn("Failed isFile check on {}", p, e); 322 } 323 return result; 324 } 325 }); 326 if (ArrayUtils.isNotEmpty(files)) { 327 Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath())); 328 } 329 return filesSorted; 330 } 331 332 /** 333 * Move aside a bad edits file. 334 * @param fs the file system used to rename bad edits file. 335 * @param edits Edits file to move aside. 336 * @return The name of the moved aside file. 337 */ 338 public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits) 339 throws IOException { 340 Path moveAsideName = 341 new Path(edits.getParent(), edits.getName() + "." + EnvironmentEdgeManager.currentTime()); 342 if (!fs.rename(edits, moveAsideName)) { 343 LOG.warn("Rename failed from {} to {}", edits, moveAsideName); 344 } 345 return moveAsideName; 346 } 347 348 /** 349 * Is the given file a region open sequence id file. 350 */ 351 public static boolean isSequenceIdFile(final Path file) { 352 return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX) 353 || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX); 354 } 355 356 private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir) 357 throws IOException { 358 // TODO: Why are we using a method in here as part of our normal region open where 359 // there is no splitting involved? Fix. St.Ack 01/20/2017. 360 Path editsDir = getRegionDirRecoveredEditsDir(regionDir); 361 try { 362 FileStatus[] files = walFS.listStatus(editsDir, WALSplitUtil::isSequenceIdFile); 363 return files != null ? files : new FileStatus[0]; 364 } catch (FileNotFoundException e) { 365 return new FileStatus[0]; 366 } 367 } 368 369 private static long getMaxSequenceId(FileStatus[] files) { 370 long maxSeqId = -1L; 371 for (FileStatus file : files) { 372 String fileName = file.getPath().getName(); 373 try { 374 maxSeqId = Math.max(maxSeqId, Long 375 .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH))); 376 } catch (NumberFormatException ex) { 377 LOG.warn("Invalid SeqId File Name={}", fileName); 378 } 379 } 380 return maxSeqId; 381 } 382 383 /** 384 * Get the max sequence id which is stored in the region directory. -1 if none. 385 */ 386 public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException { 387 return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir)); 388 } 389 390 /** 391 * Create a file with name as region's max sequence id 392 */ 393 public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId) 394 throws IOException { 395 FileStatus[] files = getSequenceIdFiles(walFS, regionDir); 396 long maxSeqId = getMaxSequenceId(files); 397 if (maxSeqId > newMaxSeqId) { 398 throw new IOException("The new max sequence id " + newMaxSeqId 399 + " is less than the old max sequence id " + maxSeqId); 400 } 401 // write a new seqId file 402 Path newSeqIdFile = 403 new Path(getRegionDirRecoveredEditsDir(regionDir), newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX); 404 if (newMaxSeqId != maxSeqId) { 405 try { 406 if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) { 407 throw new IOException("Failed to create SeqId file:" + newSeqIdFile); 408 } 409 LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId, 410 maxSeqId); 411 } catch (FileAlreadyExistsException ignored) { 412 // latest hdfs throws this exception. it's all right if newSeqIdFile already exists 413 } 414 } 415 // remove old ones 416 for (FileStatus status : files) { 417 if (!newSeqIdFile.equals(status.getPath())) { 418 walFS.delete(status.getPath(), false); 419 } 420 } 421 } 422 423 /** A struct used by getMutationsFromWALEntry */ 424 public static class MutationReplay implements Comparable<MutationReplay> { 425 public MutationReplay(ClientProtos.MutationProto.MutationType type, Mutation mutation, 426 long nonceGroup, long nonce) { 427 this.type = type; 428 this.mutation = mutation; 429 if (this.mutation.getDurability() != Durability.SKIP_WAL) { 430 // using ASYNC_WAL for relay 431 this.mutation.setDurability(Durability.ASYNC_WAL); 432 } 433 this.nonceGroup = nonceGroup; 434 this.nonce = nonce; 435 } 436 437 private final ClientProtos.MutationProto.MutationType type; 438 @SuppressWarnings("checkstyle:VisibilityModifier") 439 public final Mutation mutation; 440 @SuppressWarnings("checkstyle:VisibilityModifier") 441 public final long nonceGroup; 442 @SuppressWarnings("checkstyle:VisibilityModifier") 443 public final long nonce; 444 445 @Override 446 public int compareTo(final MutationReplay d) { 447 return this.mutation.compareTo(d.mutation); 448 } 449 450 @Override 451 public boolean equals(Object obj) { 452 if (!(obj instanceof MutationReplay)) { 453 return false; 454 } else { 455 return this.compareTo((MutationReplay) obj) == 0; 456 } 457 } 458 459 @Override 460 public int hashCode() { 461 return this.mutation.hashCode(); 462 } 463 464 public ClientProtos.MutationProto.MutationType getType() { 465 return type; 466 } 467 } 468 469 /** 470 * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey & 471 * WALEdit from the passed in WALEntry 472 * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances 473 * extracted from the passed in WALEntry. 474 * @return list of Pair<MutationType, Mutation> to be replayed 475 */ 476 public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry, 477 CellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException { 478 if (entry == null) { 479 // return an empty array 480 return Collections.emptyList(); 481 } 482 483 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) 484 ? entry.getKey().getOrigSequenceNumber() 485 : entry.getKey().getLogSequenceNumber(); 486 int count = entry.getAssociatedCellCount(); 487 List<MutationReplay> mutations = new ArrayList<>(); 488 Cell previousCell = null; 489 Mutation m = null; 490 WALKeyImpl key = null; 491 WALEdit val = null; 492 if (logEntry != null) { 493 val = new WALEdit(); 494 } 495 496 for (int i = 0; i < count; i++) { 497 // Throw index out of bounds if our cell count is off 498 if (!cells.advance()) { 499 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 500 } 501 Cell cell = cells.current(); 502 if (val != null) { 503 val.add(cell); 504 } 505 506 boolean isNewRowOrType = 507 previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() 508 || !CellUtil.matchingRows(previousCell, cell); 509 if (isNewRowOrType) { 510 // Create new mutation 511 if (CellUtil.isDelete(cell)) { 512 m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 513 // Deletes don't have nonces. 514 mutations.add(new MutationReplay(ClientProtos.MutationProto.MutationType.DELETE, m, 515 HConstants.NO_NONCE, HConstants.NO_NONCE)); 516 } else { 517 m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 518 // Puts might come from increment or append, thus we need nonces. 519 long nonceGroup = 520 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 521 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 522 mutations.add( 523 new MutationReplay(ClientProtos.MutationProto.MutationType.PUT, m, nonceGroup, nonce)); 524 } 525 } 526 if (CellUtil.isDelete(cell)) { 527 ((Delete) m).add(cell); 528 } else { 529 ((Put) m).add(cell); 530 } 531 m.setDurability(durability); 532 previousCell = cell; 533 } 534 535 // reconstruct WALKey 536 if (logEntry != null) { 537 org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto = 538 entry.getKey(); 539 List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount()); 540 for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) { 541 clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); 542 } 543 key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(), 544 TableName.valueOf(walKeyProto.getTableName().toByteArray()), replaySeqId, 545 walKeyProto.getWriteTime(), clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), 546 null); 547 logEntry.setFirst(key); 548 logEntry.setSecond(val); 549 } 550 551 return mutations; 552 } 553 554 /** 555 * Return path to recovered.hfiles directory of the region's column family: e.g. 556 * /hbase/some_table/2323432434/cf/recovered.hfiles/. This method also ensures existence of 557 * recovered.hfiles directory under the region's column family, creating it if necessary. 558 * @param rootFS the root file system 559 * @param conf configuration 560 * @param tableName the table name 561 * @param encodedRegionName the encoded region name 562 * @param familyName the column family name 563 * @return Path to recovered.hfiles directory of the region's column family. 564 */ 565 static Path tryCreateRecoveredHFilesDir(FileSystem rootFS, Configuration conf, 566 TableName tableName, String encodedRegionName, String familyName) throws IOException { 567 Path rootDir = CommonFSUtils.getRootDir(conf); 568 Path regionDir = FSUtils.getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, tableName), 569 encodedRegionName); 570 Path dir = getRecoveredHFilesDir(regionDir, familyName); 571 if (!rootFS.exists(dir) && !rootFS.mkdirs(dir)) { 572 LOG.warn("mkdir failed on {}, region {}, column family {}", dir, encodedRegionName, 573 familyName); 574 } 575 return dir; 576 } 577 578 /** 579 * @param regionDir This regions directory in the filesystem 580 * @param familyName The column family name 581 * @return The directory that holds recovered hfiles for the region's column family 582 */ 583 private static Path getRecoveredHFilesDir(final Path regionDir, String familyName) { 584 return new Path(new Path(regionDir, familyName), HConstants.RECOVERED_HFILES_DIR); 585 } 586 587 public static FileStatus[] getRecoveredHFiles(final FileSystem rootFS, final Path regionDir, 588 String familyName) throws IOException { 589 Path dir = getRecoveredHFilesDir(regionDir, familyName); 590 return CommonFSUtils.listStatus(rootFS, dir); 591 } 592}