001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
021import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.concurrent.locks.Lock;
029import java.util.stream.Collectors;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Abortable;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.KeyLocker;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * A WAL Provider that returns a WAL per group of regions. This provider follows the decorator
043 * pattern and mainly holds the logic for WAL grouping. WAL creation/roll/close is delegated to
044 * {@link #DELEGATE_PROVIDER} Region grouping is handled via {@link RegionGroupingStrategy} and can
045 * be configured via the property "hbase.wal.regiongrouping.strategy". Current strategy choices are
046 * <ul>
047 * <li><em>defaultStrategy</em> : Whatever strategy this version of HBase picks. currently
048 * "bounded".</li>
049 * <li><em>identity</em> : each region belongs to its own group.</li>
050 * <li><em>bounded</em> : bounded number of groups and region evenly assigned to each group.</li>
051 * </ul>
052 * Optionally, a FQCN to a custom implementation may be given.
053 */
054@InterfaceAudience.Private
055public class RegionGroupingProvider implements WALProvider {
056  private static final Logger LOG = LoggerFactory.getLogger(RegionGroupingProvider.class);
057
058  /**
059   * Map identifiers to a group number.
060   */
061  public static interface RegionGroupingStrategy {
062    String GROUP_NAME_DELIMITER = ".";
063
064    /**
065     * Given an identifier and a namespace, pick a group.
066     */
067    String group(final byte[] identifier, byte[] namespace);
068
069    void init(Configuration config, String providerId);
070  }
071
072  /**
073   * Maps between configuration names for strategies and implementation classes.
074   */
075  static enum Strategies {
076    defaultStrategy(BoundedGroupingStrategy.class),
077    identity(IdentityGroupingStrategy.class),
078    bounded(BoundedGroupingStrategy.class),
079    namespace(NamespaceGroupingStrategy.class);
080
081    final Class<? extends RegionGroupingStrategy> clazz;
082
083    Strategies(Class<? extends RegionGroupingStrategy> clazz) {
084      this.clazz = clazz;
085    }
086  }
087
088  /**
089   * instantiate a strategy from a config property. requires conf to have already been set (as well
090   * as anything the provider might need to read).
091   */
092  RegionGroupingStrategy getStrategy(final Configuration conf, final String key,
093    final String defaultValue) throws IOException {
094    Class<? extends RegionGroupingStrategy> clazz;
095    try {
096      clazz = Strategies.valueOf(conf.get(key, defaultValue)).clazz;
097    } catch (IllegalArgumentException exception) {
098      // Fall back to them specifying a class name
099      // Note that the passed default class shouldn't actually be used, since the above only fails
100      // when there is a config value present.
101      clazz = conf.getClass(key, IdentityGroupingStrategy.class, RegionGroupingStrategy.class);
102    }
103    LOG.info("Instantiating RegionGroupingStrategy of type " + clazz);
104    try {
105      final RegionGroupingStrategy result = clazz.getDeclaredConstructor().newInstance();
106      result.init(conf, providerId);
107      return result;
108    } catch (Exception e) {
109      LOG.error(
110        "couldn't set up region grouping strategy, check config key " + REGION_GROUPING_STRATEGY);
111      LOG.debug("Exception details for failure to load region grouping strategy.", e);
112      throw new IOException("couldn't set up region grouping strategy", e);
113    }
114  }
115
116  public static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
117  public static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
118
119  /** delegate provider for WAL creation/roll/close, but not support multiwal */
120  public static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate.provider";
121  public static final String DEFAULT_DELEGATE_PROVIDER =
122    WALFactory.Providers.defaultProvider.name();
123
124  private static final String META_WAL_GROUP_NAME = "meta";
125
126  /** A group-provider mapping, make sure one-one rather than many-one mapping */
127  private final ConcurrentMap<String, WALProvider> cached = new ConcurrentHashMap<>();
128
129  private final KeyLocker<String> createLock = new KeyLocker<>();
130
131  private RegionGroupingStrategy strategy;
132  private WALFactory factory;
133  private List<WALActionsListener> listeners = new ArrayList<>();
134  private String providerId;
135  private Class<? extends WALProvider> providerClass;
136
137  @Override
138  public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
139    throws IOException {
140    if (null != strategy) {
141      throw new IllegalStateException("WALProvider.init should only be called once.");
142    }
143    this.factory = factory;
144
145    if (META_WAL_PROVIDER_ID.equals(providerId)) {
146      // do not change the provider id if it is for meta
147      this.providerId = providerId;
148    } else {
149      StringBuilder sb = new StringBuilder().append(factory.factoryId);
150      if (providerId != null) {
151        if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
152          sb.append(providerId);
153        } else {
154          sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
155        }
156      }
157      this.providerId = sb.toString();
158    }
159    this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY);
160    this.providerClass = factory.getProviderClass(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER);
161    if (providerClass.equals(this.getClass())) {
162      LOG.warn("delegate provider not support multiwal, falling back to defaultProvider.");
163      providerClass = factory.getDefaultProvider().clazz;
164    }
165  }
166
167  private WALProvider createProvider(String group) throws IOException {
168    if (META_WAL_PROVIDER_ID.equals(providerId)) {
169      return factory.createProvider(providerClass, META_WAL_PROVIDER_ID);
170    } else {
171      return factory.createProvider(providerClass, group);
172    }
173  }
174
175  @Override
176  public List<WAL> getWALs() {
177    return cached.values().stream().flatMap(p -> p.getWALs().stream()).collect(Collectors.toList());
178  }
179
180  private WAL getWAL(String group) throws IOException {
181    WALProvider provider = cached.get(group);
182    if (provider == null) {
183      Lock lock = createLock.acquireLock(group);
184      try {
185        provider = cached.get(group);
186        if (provider == null) {
187          provider = createProvider(group);
188          listeners.forEach(provider::addWALActionsListener);
189          cached.put(group, provider);
190        }
191      } finally {
192        lock.unlock();
193      }
194    }
195    return provider.getWAL(null);
196  }
197
198  @Override
199  public WAL getWAL(RegionInfo region) throws IOException {
200    String group;
201    if (META_WAL_PROVIDER_ID.equals(this.providerId)) {
202      group = META_WAL_GROUP_NAME;
203    } else {
204      byte[] id;
205      byte[] namespace;
206      if (region != null) {
207        id = region.getEncodedNameAsBytes();
208        namespace = region.getTable().getNamespace();
209      } else {
210        id = HConstants.EMPTY_BYTE_ARRAY;
211        namespace = null;
212      }
213      group = strategy.group(id, namespace);
214    }
215    return getWAL(group);
216  }
217
218  @Override
219  public void shutdown() throws IOException {
220    // save the last exception and rethrow
221    IOException failure = null;
222    for (WALProvider provider : cached.values()) {
223      try {
224        provider.shutdown();
225      } catch (IOException e) {
226        LOG.error("Problem shutting down wal provider '" + provider + "': " + e.getMessage());
227        if (LOG.isDebugEnabled()) {
228          LOG.debug("Details of problem shutting down wal provider '" + provider + "'", e);
229        }
230        failure = e;
231      }
232    }
233    if (failure != null) {
234      throw failure;
235    }
236  }
237
238  @Override
239  public void close() throws IOException {
240    // save the last exception and rethrow
241    IOException failure = null;
242    for (WALProvider provider : cached.values()) {
243      try {
244        provider.close();
245      } catch (IOException e) {
246        LOG.error("Problem closing wal provider '" + provider + "': " + e.getMessage());
247        if (LOG.isDebugEnabled()) {
248          LOG.debug("Details of problem closing wal provider '" + provider + "'", e);
249        }
250        failure = e;
251      }
252    }
253    if (failure != null) {
254      throw failure;
255    }
256  }
257
258  static class IdentityGroupingStrategy implements RegionGroupingStrategy {
259    @Override
260    public void init(Configuration config, String providerId) {
261    }
262
263    @Override
264    public String group(final byte[] identifier, final byte[] namespace) {
265      return Bytes.toString(identifier);
266    }
267  }
268
269  @Override
270  public long getNumLogFiles() {
271    long numLogFiles = 0;
272    for (WALProvider provider : cached.values()) {
273      numLogFiles += provider.getNumLogFiles();
274    }
275    return numLogFiles;
276  }
277
278  @Override
279  public long getLogFileSize() {
280    long logFileSize = 0;
281    for (WALProvider provider : cached.values()) {
282      logFileSize += provider.getLogFileSize();
283    }
284    return logFileSize;
285  }
286
287  @Override
288  public void addWALActionsListener(WALActionsListener listener) {
289    // Notice that there is an assumption that this method must be called before the getWAL above,
290    // so we can make sure there is no sub WALProvider yet, so we only add the listener to our
291    // listeners list without calling addWALActionListener for each WALProvider. Although it is no
292    // hurt to execute an extra loop to call addWALActionListener for each WALProvider, but if the
293    // extra code actually works, then we will have other big problems. So leave it as is.
294    listeners.add(listener);
295  }
296}