001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER; 023import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.List; 029import java.util.Optional; 030import java.util.stream.Collectors; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 039import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure; 040import org.apache.hadoop.hbase.procedure2.Procedure; 041import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 042import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 043import org.apache.hadoop.hbase.util.CommonFSUtils; 044import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 045import org.apache.hadoop.hbase.wal.WALSplitUtil; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * Create {@link SplitWALProcedure} for each WAL which need to split. Manage the workers for each 052 * {@link SplitWALProcedure}. Total number of workers is (number of online servers) * 053 * (HBASE_SPLIT_WAL_MAX_SPLITTER). Helps assign and release workers for split tasks. Provide helper 054 * method to delete split WAL file and directory. The user can get the SplitWALProcedures via 055 * splitWALs(crashedServer, splitMeta) can get the files that need to split via 056 * getWALsToSplit(crashedServer, splitMeta) can delete the splitting WAL and directory via 057 * deleteSplitWAL(wal) and deleteSplitWAL(crashedServer) can check if splitting WALs of a crashed 058 * server is success via isSplitWALFinished(walPath) can acquire and release a worker for splitting 059 * WAL via acquireSplitWALWorker(procedure) and releaseSplitWALWorker(worker, scheduler) This class 060 * is to replace the zk-based WAL splitting related code, {@link MasterWalManager}, 061 * {@link SplitLogManager}, {@link org.apache.hadoop.hbase.zookeeper.ZKSplitLog} and 062 * {@link org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination} can be removed after 063 * we switch to procedure-based WAL splitting. 064 * @see SplitLogManager for the original distributed split WAL manager. 065 */ 066@InterfaceAudience.Private 067public class SplitWALManager { 068 private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class); 069 070 private final MasterServices master; 071 private final WorkerAssigner splitWorkerAssigner; 072 private final Path rootDir; 073 private final FileSystem fs; 074 private final Configuration conf; 075 private final Path walArchiveDir; 076 077 public SplitWALManager(MasterServices master) throws IOException { 078 this.master = master; 079 this.conf = master.getConfiguration(); 080 this.splitWorkerAssigner = new WorkerAssigner(this.master, 081 conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER), 082 new ProcedureEvent<>("split-WAL-worker-assigning")); 083 this.rootDir = master.getMasterFileSystem().getWALRootDir(); 084 this.fs = master.getMasterFileSystem().getWALFileSystem(); 085 this.walArchiveDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 086 } 087 088 public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta) throws IOException { 089 try { 090 // 1. list all splitting files 091 List<FileStatus> splittingFiles = getWALsToSplit(crashedServer, splitMeta); 092 // 2. create corresponding procedures 093 return createSplitWALProcedures(splittingFiles, crashedServer); 094 } catch (IOException e) { 095 LOG.error("Failed to create procedures for splitting WALs of {}", crashedServer, e); 096 throw e; 097 } 098 } 099 100 public List<FileStatus> getWALsToSplit(ServerName serverName, boolean splitMeta) 101 throws IOException { 102 List<Path> logDirs = master.getMasterWalManager().getLogDirs(Collections.singleton(serverName)); 103 List<FileStatus> fileStatuses = 104 SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER); 105 LOG.info("{} WAL count={}, meta={}", serverName, fileStatuses.size(), splitMeta); 106 return fileStatuses; 107 } 108 109 private Path getWALSplitDir(ServerName serverName) { 110 Path logDir = 111 new Path(this.rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); 112 return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT); 113 } 114 115 /** 116 * Archive processed WAL 117 */ 118 public void archive(String wal) throws IOException { 119 WALSplitUtil.moveWAL(this.fs, new Path(wal), this.walArchiveDir); 120 } 121 122 public void deleteWALDir(ServerName serverName) throws IOException { 123 Path splitDir = getWALSplitDir(serverName); 124 try { 125 if (!fs.delete(splitDir, false)) { 126 LOG.warn("Failed delete {}, contains {}", splitDir, fs.listFiles(splitDir, true)); 127 } 128 } catch (PathIsNotEmptyDirectoryException e) { 129 FileStatus[] files = CommonFSUtils.listStatus(fs, splitDir); 130 LOG.warn("PathIsNotEmptyDirectoryException {}", 131 Arrays.stream(files).map(f -> f.getPath()).collect(Collectors.toList())); 132 throw e; 133 } 134 } 135 136 public boolean isSplitWALFinished(String walPath) throws IOException { 137 return !fs.exists(new Path(rootDir, walPath)); 138 } 139 140 List<Procedure> createSplitWALProcedures(List<FileStatus> splittingWALs, 141 ServerName crashedServer) { 142 return splittingWALs.stream() 143 .map(wal -> new SplitWALProcedure(wal.getPath().toString(), crashedServer)) 144 .collect(Collectors.toList()); 145 } 146 147 /** 148 * Acquire a split WAL worker 149 * @param procedure split WAL task 150 * @return an available region server which could execute this task 151 * @throws ProcedureSuspendedException if there is no available worker, it will throw this 152 * exception to WAIT the procedure. 153 */ 154 public ServerName acquireSplitWALWorker(Procedure<?> procedure) 155 throws ProcedureSuspendedException { 156 Optional<ServerName> worker = splitWorkerAssigner.acquire(); 157 if (worker.isPresent()) { 158 LOG.debug("Acquired split WAL worker={}", worker.get()); 159 return worker.get(); 160 } 161 splitWorkerAssigner.suspend(procedure); 162 throw new ProcedureSuspendedException(); 163 } 164 165 /** 166 * After the worker finished the split WAL task, it will release the worker, and wake up all the 167 * suspend procedures in the ProcedureEvent 168 * @param worker worker which is about to release 169 * @param scheduler scheduler which is to wake up the procedure event 170 */ 171 public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) { 172 LOG.debug("Release split WAL worker={}", worker); 173 splitWorkerAssigner.release(worker); 174 splitWorkerAssigner.wake(scheduler); 175 } 176 177 /** 178 * When master restart, there will be a new splitWorkerAssigner. But if there are splitting WAL 179 * tasks running on the region server side, they will not be count by the new splitWorkerAssigner. 180 * Thus we should add the workers of running tasks to the assigner when we load the procedures 181 * from MasterProcWALs. 182 * @param worker region server which is executing a split WAL task 183 */ 184 public void addUsedSplitWALWorker(ServerName worker) { 185 splitWorkerAssigner.addUsedWorker(worker); 186 } 187}