001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.List;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
031import org.apache.hadoop.hbase.exceptions.DeserializationException;
032import org.apache.hadoop.hbase.util.CommonFSUtils;
033import org.apache.hadoop.hbase.util.RotateFile;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * A filesystem based replication peer storage. The implementation does not require atomic rename so
040 * you can use it on cloud OSS.
041 * <p/>
042 * FileSystem layout:
043 *
044 * <pre>
045 * hbase
046 *   |
047 *   --peers
048 *       |
049 *       --&lt;peer_id&gt;
050 *           |
051 *           --peer_config
052 *           |
053 *           --disabled
054 *           |
055 *           --sync-rep-state
056 * </pre>
057 *
058 * Notice that, if the peer is enabled, we will not have a disabled file.
059 * <p/>
060 * And for other files, to avoid depending on atomic rename, we will use two files for storing the
061 * content. When loading, we will try to read both the files and load the newer one. And when
062 * writing, we will write to the older file.
063 */
064@InterfaceAudience.Private
065public class FSReplicationPeerStorage implements ReplicationPeerStorage {
066
067  private static final Logger LOG = LoggerFactory.getLogger(FSReplicationPeerStorage.class);
068
069  public static final String PEERS_DIR = "hbase.replication.peers.directory";
070
071  public static final String PEERS_DIR_DEFAULT = "peers";
072
073  static final String PEER_CONFIG_FILE = "peer_config";
074
075  static final String DISABLED_FILE = "disabled";
076
077  private final FileSystem fs;
078
079  private final Path dir;
080
081  public FSReplicationPeerStorage(FileSystem fs, Configuration conf) throws IOException {
082    this.fs = fs;
083    this.dir = new Path(CommonFSUtils.getRootDir(conf), conf.get(PEERS_DIR, PEERS_DIR_DEFAULT));
084  }
085
086  @RestrictedApi(explanation = "Should only be called in tests", link = "",
087      allowedOnPath = ".*/FSReplicationPeerStorage.java|.*/src/test/.*")
088  Path getPeerDir(String peerId) {
089    return new Path(dir, peerId);
090  }
091
092  @Override
093  public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
094    throws ReplicationException {
095    Path peerDir = getPeerDir(peerId);
096    try {
097      if (fs.exists(peerDir)) {
098        // check whether this is a valid peer, if so we should fail the add peer operation
099        if (read(fs, peerDir, PEER_CONFIG_FILE) != null) {
100          throw new ReplicationException(
101            "Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig + ", state="
102              + (enabled ? "ENABLED" : "DISABLED") + ", peer already exists");
103        }
104      }
105      if (!enabled) {
106        fs.createNewFile(new Path(peerDir, DISABLED_FILE));
107      }
108      // write the peer config data at last, so when loading, if we can not load the peer_config, we
109      // know that this is not a valid peer
110      write(fs, peerDir, PEER_CONFIG_FILE, ReplicationPeerConfigUtil.toByteArray(peerConfig));
111    } catch (IOException e) {
112      throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfig=>"
113        + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
114    }
115  }
116
117  @Override
118  public void removePeer(String peerId) throws ReplicationException {
119    // delete the peer config first, and then delete the directory
120    // we will consider this is not a valid peer by reading the peer config file
121    Path peerDir = getPeerDir(peerId);
122    try {
123      delete(fs, peerDir, PEER_CONFIG_FILE);
124      if (!fs.delete(peerDir, true)) {
125        throw new IOException("Can not delete " + peerDir);
126      }
127    } catch (IOException e) {
128      throw new ReplicationException("Could not remove peer with id=" + peerId, e);
129    }
130  }
131
132  @Override
133  public void setPeerState(String peerId, boolean enabled) throws ReplicationException {
134    Path disabledFile = new Path(getPeerDir(peerId), DISABLED_FILE);
135    try {
136      if (enabled) {
137        if (fs.exists(disabledFile) && !fs.delete(disabledFile, false)) {
138          throw new IOException("Can not delete " + disabledFile);
139        }
140      } else {
141        if (!fs.exists(disabledFile) && !fs.createNewFile(disabledFile)) {
142          throw new IOException("Can not touch " + disabledFile);
143        }
144      }
145    } catch (IOException e) {
146      throw new ReplicationException(
147        "Unable to change state of the peer with id=" + peerId + " to " + enabled, e);
148    }
149  }
150
151  @Override
152  public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
153    throws ReplicationException {
154    Path peerDir = getPeerDir(peerId);
155    try {
156      write(fs, peerDir, PEER_CONFIG_FILE, ReplicationPeerConfigUtil.toByteArray(peerConfig));
157    } catch (IOException e) {
158      throw new ReplicationException(
159        "There was a problem trying to save changes to the " + "replication peer " + peerId, e);
160    }
161  }
162
163  @Override
164  public List<String> listPeerIds() throws ReplicationException {
165    try {
166      FileStatus[] statuses = fs.listStatus(dir);
167      if (statuses == null || statuses.length == 0) {
168        return Collections.emptyList();
169      }
170      List<String> peerIds = new ArrayList<>();
171      for (FileStatus status : statuses) {
172        String peerId = status.getPath().getName();
173        Path peerDir = getPeerDir(peerId);
174        // confirm that this is a valid peer
175        byte[] peerConfigData = read(fs, peerDir, PEER_CONFIG_FILE);
176        if (peerConfigData != null) {
177          peerIds.add(peerId);
178        }
179      }
180      return Collections.unmodifiableList(peerIds);
181    } catch (FileNotFoundException e) {
182      LOG.debug("Peer directory does not exist yet", e);
183      return Collections.emptyList();
184    } catch (IOException e) {
185      throw new ReplicationException("Cannot get the list of peers", e);
186    }
187  }
188
189  @Override
190  public boolean isPeerEnabled(String peerId) throws ReplicationException {
191    Path disabledFile = new Path(getPeerDir(peerId), DISABLED_FILE);
192    try {
193      return !fs.exists(disabledFile);
194    } catch (IOException e) {
195      throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e);
196    }
197  }
198
199  @Override
200  public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException {
201    Path peerDir = getPeerDir(peerId);
202    byte[] data;
203    try {
204      data = read(fs, peerDir, PEER_CONFIG_FILE);
205    } catch (IOException e) {
206      throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e);
207    }
208    if (data == null || data.length == 0) {
209      throw new ReplicationException(
210        "Replication peer config data shouldn't be empty, peerId=" + peerId);
211    }
212    try {
213      return ReplicationPeerConfigUtil.parsePeerFrom(data);
214    } catch (DeserializationException e) {
215      throw new ReplicationException(
216        "Failed to parse replication peer config for peer with id=" + peerId, e);
217    }
218  }
219
220  // 16 MB is big enough for our usage here
221  private static final long MAX_FILE_SIZE = 16 * 1024 * 1024;
222
223  private static byte[] read(FileSystem fs, Path dir, String name) throws IOException {
224    RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE);
225    return file.read();
226  }
227
228  private static void write(FileSystem fs, Path dir, String name, byte[] data) throws IOException {
229    RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE);
230    // to initialize the nextFile index
231    file.read();
232    file.write(data);
233  }
234
235  private static void delete(FileSystem fs, Path dir, String name) throws IOException {
236    RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE);
237    // to initialize the nextFile index
238    file.read();
239    file.delete();
240  }
241}