001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.concurrent.CompletableFuture; 023import java.util.concurrent.CountDownLatch; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.Abortable; 028import org.apache.hadoop.hbase.client.RegionInfo; 029import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 030import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; 031import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; 032import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 033import org.apache.hadoop.hbase.util.CommonFSUtils; 034import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 035import org.apache.hadoop.hbase.wal.WAL; 036import org.apache.hadoop.hbase.wal.WALProvider; 037 038import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 039import org.apache.hbase.thirdparty.io.netty.channel.Channel; 040import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 041 042public class BrokenRemoteAsyncFSWALProvider extends AsyncFSWALProvider { 043 044 static class BrokenRemoteAsyncFSWAL extends AsyncFSWAL { 045 046 private final class MyCombinedAsyncWriter implements WALProvider.AsyncWriter { 047 048 private final WALProvider.AsyncWriter localWriter; 049 050 private final WALProvider.AsyncWriter remoteWriter; 051 052 // remoteWriter on the first 053 public MyCombinedAsyncWriter(WALProvider.AsyncWriter localWriter, 054 WALProvider.AsyncWriter remoteWriter) { 055 this.localWriter = localWriter; 056 this.remoteWriter = remoteWriter; 057 } 058 059 @Override 060 public long getLength() { 061 return localWriter.getLength(); 062 } 063 064 @Override 065 public long getSyncedLength() { 066 return this.localWriter.getSyncedLength(); 067 } 068 069 @Override 070 public void close() throws IOException { 071 Closeables.close(localWriter, true); 072 Closeables.close(remoteWriter, true); 073 } 074 075 @Override 076 public CompletableFuture<Long> sync(boolean forceSync) { 077 CompletableFuture<Long> localFuture; 078 CompletableFuture<Long> remoteFuture; 079 080 if (!localBroken) { 081 localFuture = localWriter.sync(forceSync); 082 } else { 083 localFuture = new CompletableFuture<>(); 084 localFuture.completeExceptionally(new IOException("Inject error")); 085 } 086 if (!remoteBroken) { 087 remoteFuture = remoteWriter.sync(forceSync); 088 } else { 089 remoteFuture = new CompletableFuture<>(); 090 remoteFuture.completeExceptionally(new IOException("Inject error")); 091 } 092 return CompletableFuture.allOf(localFuture, remoteFuture).thenApply(v -> { 093 return localFuture.getNow(0L); 094 }); 095 } 096 097 @Override 098 public void append(WAL.Entry entry) { 099 if (!localBroken) { 100 localWriter.append(entry); 101 } 102 if (!remoteBroken) { 103 remoteWriter.append(entry); 104 } 105 } 106 } 107 108 private volatile boolean localBroken; 109 110 private volatile boolean remoteBroken; 111 112 private CountDownLatch arrive; 113 114 private CountDownLatch resume; 115 116 public void setLocalBroken() { 117 this.localBroken = true; 118 } 119 120 public void setRemoteBroken() { 121 this.remoteBroken = true; 122 } 123 124 public void suspendLogRoll() { 125 arrive = new CountDownLatch(1); 126 resume = new CountDownLatch(1); 127 } 128 129 public void waitUntilArrive() throws InterruptedException { 130 arrive.await(); 131 } 132 133 public void resumeLogRoll() { 134 resume.countDown(); 135 } 136 137 public BrokenRemoteAsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir, 138 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 139 boolean failIfWALExists, String prefix, String suffix, FileSystem remoteFs, Path remoteWALDir, 140 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, 141 StreamSlowMonitor monitor) throws FailedLogCloseException, IOException { 142 super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, 143 suffix, remoteFs, remoteWALDir, eventLoopGroup, channelClass, monitor); 144 } 145 146 @Override 147 protected WALProvider.AsyncWriter createCombinedWriter(WALProvider.AsyncWriter localWriter, 148 WALProvider.AsyncWriter remoteWriter) { 149 return new MyCombinedAsyncWriter(localWriter, remoteWriter); 150 } 151 152 @Override 153 protected WALProvider.AsyncWriter createWriterInstance(FileSystem fs, Path path) 154 throws IOException { 155 if (arrive != null) { 156 arrive.countDown(); 157 try { 158 resume.await(); 159 } catch (InterruptedException e) { 160 } 161 } 162 if (localBroken || remoteBroken) { 163 throw new IOException("WAL broken"); 164 } 165 return super.createWriterInstance(fs, path); 166 } 167 } 168 169 @Override 170 protected AsyncFSWAL createWAL() throws IOException { 171 return new BrokenRemoteAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable, 172 CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.getFactoryId()), 173 getWALArchiveDirectoryName(conf, factory.getFactoryId()), conf, listeners, true, logPrefix, 174 META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, null, null, 175 eventLoopGroup, channelClass, 176 factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId)); 177 178 } 179 180 @Override 181 protected WAL createRemoteWAL(RegionInfo region, FileSystem remoteFs, Path remoteWALDir, 182 String prefix, String suffix) throws IOException { 183 return new BrokenRemoteAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable, 184 CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.getFactoryId()), 185 getWALArchiveDirectoryName(conf, factory.getFactoryId()), conf, listeners, true, prefix, 186 suffix, remoteFs, remoteWALDir, eventLoopGroup, channelClass, 187 factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId)); 188 } 189 190}