001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.List; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileStatus; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 031import org.apache.hadoop.hbase.exceptions.DeserializationException; 032import org.apache.hadoop.hbase.util.CommonFSUtils; 033import org.apache.hadoop.hbase.util.Pair; 034import org.apache.hadoop.hbase.util.RotateFile; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * A filesystem based replication peer storage. The implementation does not require atomic rename so 041 * you can use it on cloud OSS. 042 * <p/> 043 * FileSystem layout: 044 * 045 * <pre> 046 * hbase 047 * | 048 * --peers 049 * | 050 * --<peer_id> 051 * | 052 * --peer_config 053 * | 054 * --disabled 055 * | 056 * --sync-rep-state 057 * </pre> 058 * 059 * Notice that, if the peer is enabled, we will not have a disabled file. 060 * <p/> 061 * And for other files, to avoid depending on atomic rename, we will use two files for storing the 062 * content. When loading, we will try to read both the files and load the newer one. And when 063 * writing, we will write to the older file. 064 */ 065@InterfaceAudience.Private 066public class FSReplicationPeerStorage implements ReplicationPeerStorage { 067 068 private static final Logger LOG = LoggerFactory.getLogger(FSReplicationPeerStorage.class); 069 070 public static final String PEERS_DIR = "hbase.replication.peers.directory"; 071 072 public static final String PEERS_DIR_DEFAULT = "peers"; 073 074 static final String PEER_CONFIG_FILE = "peer_config"; 075 076 static final String DISABLED_FILE = "disabled"; 077 078 static final String SYNC_REPLICATION_STATE_FILE = "sync-rep-state"; 079 080 static final byte[] NONE_STATE_BYTES = 081 SyncReplicationState.toByteArray(SyncReplicationState.NONE); 082 083 private final FileSystem fs; 084 085 private final Path dir; 086 087 public FSReplicationPeerStorage(FileSystem fs, Configuration conf) throws IOException { 088 this.fs = fs; 089 this.dir = new Path(CommonFSUtils.getRootDir(conf), conf.get(PEERS_DIR, PEERS_DIR_DEFAULT)); 090 } 091 092 @RestrictedApi(explanation = "Should only be called in tests", link = "", 093 allowedOnPath = ".*/FSReplicationPeerStorage.java|.*/src/test/.*") 094 Path getPeerDir(String peerId) { 095 return new Path(dir, peerId); 096 } 097 098 @Override 099 public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled, 100 SyncReplicationState syncReplicationState) throws ReplicationException { 101 Path peerDir = getPeerDir(peerId); 102 try { 103 if (fs.exists(peerDir)) { 104 // check whether this is a valid peer, if so we should fail the add peer operation 105 if (read(fs, peerDir, PEER_CONFIG_FILE) != null) { 106 throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfig=>" 107 + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED") 108 + ", syncReplicationState=" + syncReplicationState + ", peer already exists"); 109 } 110 } 111 if (!enabled) { 112 fs.createNewFile(new Path(peerDir, DISABLED_FILE)); 113 } 114 write(fs, peerDir, SYNC_REPLICATION_STATE_FILE, 115 SyncReplicationState.toByteArray(syncReplicationState, SyncReplicationState.NONE)); 116 // write the peer config data at last, so when loading, if we can not load the peer_config, we 117 // know that this is not a valid peer 118 write(fs, peerDir, PEER_CONFIG_FILE, ReplicationPeerConfigUtil.toByteArray(peerConfig)); 119 } catch (IOException e) { 120 throw new ReplicationException( 121 "Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig + ", state=" 122 + (enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + syncReplicationState, 123 e); 124 } 125 } 126 127 @Override 128 public void removePeer(String peerId) throws ReplicationException { 129 // delete the peer config first, and then delete the directory 130 // we will consider this is not a valid peer by reading the peer config file 131 Path peerDir = getPeerDir(peerId); 132 try { 133 delete(fs, peerDir, PEER_CONFIG_FILE); 134 if (!fs.delete(peerDir, true)) { 135 throw new IOException("Can not delete " + peerDir); 136 } 137 } catch (IOException e) { 138 throw new ReplicationException("Could not remove peer with id=" + peerId, e); 139 } 140 } 141 142 @Override 143 public void setPeerState(String peerId, boolean enabled) throws ReplicationException { 144 Path disabledFile = new Path(getPeerDir(peerId), DISABLED_FILE); 145 try { 146 if (enabled) { 147 if (fs.exists(disabledFile) && !fs.delete(disabledFile, false)) { 148 throw new IOException("Can not delete " + disabledFile); 149 } 150 } else { 151 if (!fs.exists(disabledFile) && !fs.createNewFile(disabledFile)) { 152 throw new IOException("Can not touch " + disabledFile); 153 } 154 } 155 } catch (IOException e) { 156 throw new ReplicationException( 157 "Unable to change state of the peer with id=" + peerId + " to " + enabled, e); 158 } 159 } 160 161 @Override 162 public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 163 throws ReplicationException { 164 Path peerDir = getPeerDir(peerId); 165 try { 166 write(fs, peerDir, PEER_CONFIG_FILE, ReplicationPeerConfigUtil.toByteArray(peerConfig)); 167 } catch (IOException e) { 168 throw new ReplicationException( 169 "There was a problem trying to save changes to the " + "replication peer " + peerId, e); 170 } 171 } 172 173 @Override 174 public List<String> listPeerIds() throws ReplicationException { 175 try { 176 FileStatus[] statuses = fs.listStatus(dir); 177 if (statuses == null || statuses.length == 0) { 178 return Collections.emptyList(); 179 } 180 List<String> peerIds = new ArrayList<>(); 181 for (FileStatus status : statuses) { 182 String peerId = status.getPath().getName(); 183 Path peerDir = getPeerDir(peerId); 184 // confirm that this is a valid peer 185 byte[] peerConfigData = read(fs, peerDir, PEER_CONFIG_FILE); 186 if (peerConfigData != null) { 187 peerIds.add(peerId); 188 } 189 } 190 return Collections.unmodifiableList(peerIds); 191 } catch (FileNotFoundException e) { 192 LOG.debug("Peer directory does not exist yet", e); 193 return Collections.emptyList(); 194 } catch (IOException e) { 195 throw new ReplicationException("Cannot get the list of peers", e); 196 } 197 } 198 199 @Override 200 public boolean isPeerEnabled(String peerId) throws ReplicationException { 201 Path disabledFile = new Path(getPeerDir(peerId), DISABLED_FILE); 202 try { 203 return !fs.exists(disabledFile); 204 } catch (IOException e) { 205 throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e); 206 } 207 } 208 209 @Override 210 public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException { 211 Path peerDir = getPeerDir(peerId); 212 byte[] data; 213 try { 214 data = read(fs, peerDir, PEER_CONFIG_FILE); 215 } catch (IOException e) { 216 throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e); 217 } 218 if (data == null || data.length == 0) { 219 throw new ReplicationException( 220 "Replication peer config data shouldn't be empty, peerId=" + peerId); 221 } 222 try { 223 return ReplicationPeerConfigUtil.parsePeerFrom(data); 224 } catch (DeserializationException e) { 225 throw new ReplicationException( 226 "Failed to parse replication peer config for peer with id=" + peerId, e); 227 } 228 } 229 230 private Pair<SyncReplicationState, SyncReplicationState> getStateAndNewState(String peerId) 231 throws IOException { 232 Path peerDir = getPeerDir(peerId); 233 if (!fs.exists(peerDir)) { 234 throw new IOException("peer does not exists"); 235 } 236 byte[] data = read(fs, peerDir, SYNC_REPLICATION_STATE_FILE); 237 if (data == null) { 238 // should be a peer from previous version, set the sync replication state for it. 239 write(fs, peerDir, SYNC_REPLICATION_STATE_FILE, 240 SyncReplicationState.toByteArray(SyncReplicationState.NONE, SyncReplicationState.NONE)); 241 return Pair.newPair(SyncReplicationState.NONE, SyncReplicationState.NONE); 242 } else { 243 return SyncReplicationState.parseStateAndNewStateFrom(data); 244 } 245 } 246 247 @Override 248 public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState newState) 249 throws ReplicationException { 250 Path peerDir = getPeerDir(peerId); 251 try { 252 Pair<SyncReplicationState, SyncReplicationState> stateAndNewState = 253 getStateAndNewState(peerId); 254 write(fs, peerDir, SYNC_REPLICATION_STATE_FILE, 255 SyncReplicationState.toByteArray(stateAndNewState.getFirst(), newState)); 256 } catch (IOException e) { 257 throw new ReplicationException( 258 "Unable to set the new sync replication state for peer with id=" + peerId + ", newState=" 259 + newState, 260 e); 261 } 262 } 263 264 @Override 265 public void transitPeerSyncReplicationState(String peerId) throws ReplicationException { 266 Path peerDir = getPeerDir(peerId); 267 try { 268 Pair<SyncReplicationState, SyncReplicationState> stateAndNewState = 269 getStateAndNewState(peerId); 270 write(fs, peerDir, SYNC_REPLICATION_STATE_FILE, 271 SyncReplicationState.toByteArray(stateAndNewState.getSecond(), SyncReplicationState.NONE)); 272 } catch (IOException e) { 273 throw new ReplicationException( 274 "Error transiting sync replication state for peer with id=" + peerId, e); 275 } 276 } 277 278 @Override 279 public SyncReplicationState getPeerSyncReplicationState(String peerId) 280 throws ReplicationException { 281 try { 282 return getStateAndNewState(peerId).getFirst(); 283 } catch (IOException e) { 284 throw new ReplicationException( 285 "Error getting sync replication state for peer with id=" + peerId, e); 286 } 287 } 288 289 @Override 290 public SyncReplicationState getPeerNewSyncReplicationState(String peerId) 291 throws ReplicationException { 292 try { 293 return getStateAndNewState(peerId).getSecond(); 294 } catch (IOException e) { 295 throw new ReplicationException( 296 "Error getting new sync replication state for peer with id=" + peerId, e); 297 } 298 } 299 300 // 16 MB is big enough for our usage here 301 private static final long MAX_FILE_SIZE = 16 * 1024 * 1024; 302 303 private static byte[] read(FileSystem fs, Path dir, String name) throws IOException { 304 RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE); 305 return file.read(); 306 } 307 308 private static void write(FileSystem fs, Path dir, String name, byte[] data) throws IOException { 309 RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE); 310 // to initialize the nextFile index 311 file.read(); 312 file.write(data); 313 } 314 315 private static void delete(FileSystem fs, Path dir, String name) throws IOException { 316 RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE); 317 // to initialize the nextFile index 318 file.read(); 319 file.delete(); 320 } 321}