001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.List; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileStatus; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 031import org.apache.hadoop.hbase.exceptions.DeserializationException; 032import org.apache.hadoop.hbase.util.CommonFSUtils; 033import org.apache.hadoop.hbase.util.RotateFile; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * A filesystem based replication peer storage. The implementation does not require atomic rename so 040 * you can use it on cloud OSS. 041 * <p/> 042 * FileSystem layout: 043 * 044 * <pre> 045 * hbase 046 * | 047 * --peers 048 * | 049 * --<peer_id> 050 * | 051 * --peer_config 052 * | 053 * --disabled 054 * | 055 * --sync-rep-state 056 * </pre> 057 * 058 * Notice that, if the peer is enabled, we will not have a disabled file. 059 * <p/> 060 * And for other files, to avoid depending on atomic rename, we will use two files for storing the 061 * content. When loading, we will try to read both the files and load the newer one. And when 062 * writing, we will write to the older file. 063 */ 064@InterfaceAudience.Private 065public class FSReplicationPeerStorage implements ReplicationPeerStorage { 066 067 private static final Logger LOG = LoggerFactory.getLogger(FSReplicationPeerStorage.class); 068 069 public static final String PEERS_DIR = "hbase.replication.peers.directory"; 070 071 public static final String PEERS_DIR_DEFAULT = "peers"; 072 073 static final String PEER_CONFIG_FILE = "peer_config"; 074 075 static final String DISABLED_FILE = "disabled"; 076 077 private final FileSystem fs; 078 079 private final Path dir; 080 081 public FSReplicationPeerStorage(FileSystem fs, Configuration conf) throws IOException { 082 this.fs = fs; 083 this.dir = new Path(CommonFSUtils.getRootDir(conf), conf.get(PEERS_DIR, PEERS_DIR_DEFAULT)); 084 } 085 086 @RestrictedApi(explanation = "Should only be called in tests", link = "", 087 allowedOnPath = ".*/FSReplicationPeerStorage.java|.*/src/test/.*") 088 Path getPeerDir(String peerId) { 089 return new Path(dir, peerId); 090 } 091 092 @Override 093 public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) 094 throws ReplicationException { 095 Path peerDir = getPeerDir(peerId); 096 try { 097 if (fs.exists(peerDir)) { 098 // check whether this is a valid peer, if so we should fail the add peer operation 099 if (read(fs, peerDir, PEER_CONFIG_FILE) != null) { 100 throw new ReplicationException( 101 "Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig + ", state=" 102 + (enabled ? "ENABLED" : "DISABLED") + ", peer already exists"); 103 } 104 } 105 if (!enabled) { 106 fs.createNewFile(new Path(peerDir, DISABLED_FILE)); 107 } 108 // write the peer config data at last, so when loading, if we can not load the peer_config, we 109 // know that this is not a valid peer 110 write(fs, peerDir, PEER_CONFIG_FILE, ReplicationPeerConfigUtil.toByteArray(peerConfig)); 111 } catch (IOException e) { 112 throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfig=>" 113 + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e); 114 } 115 } 116 117 @Override 118 public void removePeer(String peerId) throws ReplicationException { 119 // delete the peer config first, and then delete the directory 120 // we will consider this is not a valid peer by reading the peer config file 121 Path peerDir = getPeerDir(peerId); 122 try { 123 delete(fs, peerDir, PEER_CONFIG_FILE); 124 if (!fs.delete(peerDir, true)) { 125 throw new IOException("Can not delete " + peerDir); 126 } 127 } catch (IOException e) { 128 throw new ReplicationException("Could not remove peer with id=" + peerId, e); 129 } 130 } 131 132 @Override 133 public void setPeerState(String peerId, boolean enabled) throws ReplicationException { 134 Path disabledFile = new Path(getPeerDir(peerId), DISABLED_FILE); 135 try { 136 if (enabled) { 137 if (fs.exists(disabledFile) && !fs.delete(disabledFile, false)) { 138 throw new IOException("Can not delete " + disabledFile); 139 } 140 } else { 141 if (!fs.exists(disabledFile) && !fs.createNewFile(disabledFile)) { 142 throw new IOException("Can not touch " + disabledFile); 143 } 144 } 145 } catch (IOException e) { 146 throw new ReplicationException( 147 "Unable to change state of the peer with id=" + peerId + " to " + enabled, e); 148 } 149 } 150 151 @Override 152 public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 153 throws ReplicationException { 154 Path peerDir = getPeerDir(peerId); 155 try { 156 write(fs, peerDir, PEER_CONFIG_FILE, ReplicationPeerConfigUtil.toByteArray(peerConfig)); 157 } catch (IOException e) { 158 throw new ReplicationException( 159 "There was a problem trying to save changes to the " + "replication peer " + peerId, e); 160 } 161 } 162 163 @Override 164 public List<String> listPeerIds() throws ReplicationException { 165 try { 166 FileStatus[] statuses = fs.listStatus(dir); 167 if (statuses == null || statuses.length == 0) { 168 return Collections.emptyList(); 169 } 170 List<String> peerIds = new ArrayList<>(); 171 for (FileStatus status : statuses) { 172 String peerId = status.getPath().getName(); 173 Path peerDir = getPeerDir(peerId); 174 // confirm that this is a valid peer 175 byte[] peerConfigData = read(fs, peerDir, PEER_CONFIG_FILE); 176 if (peerConfigData != null) { 177 peerIds.add(peerId); 178 } 179 } 180 return Collections.unmodifiableList(peerIds); 181 } catch (FileNotFoundException e) { 182 LOG.debug("Peer directory does not exist yet", e); 183 return Collections.emptyList(); 184 } catch (IOException e) { 185 throw new ReplicationException("Cannot get the list of peers", e); 186 } 187 } 188 189 @Override 190 public boolean isPeerEnabled(String peerId) throws ReplicationException { 191 Path disabledFile = new Path(getPeerDir(peerId), DISABLED_FILE); 192 try { 193 return !fs.exists(disabledFile); 194 } catch (IOException e) { 195 throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e); 196 } 197 } 198 199 @Override 200 public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException { 201 Path peerDir = getPeerDir(peerId); 202 byte[] data; 203 try { 204 data = read(fs, peerDir, PEER_CONFIG_FILE); 205 } catch (IOException e) { 206 throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e); 207 } 208 if (data == null || data.length == 0) { 209 throw new ReplicationException( 210 "Replication peer config data shouldn't be empty, peerId=" + peerId); 211 } 212 try { 213 return ReplicationPeerConfigUtil.parsePeerFrom(data); 214 } catch (DeserializationException e) { 215 throw new ReplicationException( 216 "Failed to parse replication peer config for peer with id=" + peerId, e); 217 } 218 } 219 220 // 16 MB is big enough for our usage here 221 private static final long MAX_FILE_SIZE = 16 * 1024 * 1024; 222 223 private static byte[] read(FileSystem fs, Path dir, String name) throws IOException { 224 RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE); 225 return file.read(); 226 } 227 228 private static void write(FileSystem fs, Path dir, String name, byte[] data) throws IOException { 229 RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE); 230 // to initialize the nextFile index 231 file.read(); 232 file.write(data); 233 } 234 235 private static void delete(FileSystem fs, Path dir, String name) throws IOException { 236 RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE); 237 // to initialize the nextFile index 238 file.read(); 239 file.delete(); 240 } 241}