001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.Future; 029import java.util.concurrent.atomic.AtomicInteger; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.io.MultipleIOException; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * Class that manages the output streams from the log splitting process. Every region may have many 039 * recovered edits file. But the opening writers is bounded. Bounded means the output streams will 040 * be no more than the size of threadpool. 041 */ 042@InterfaceAudience.Private 043class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink { 044 private static final Logger LOG = LoggerFactory.getLogger(BoundedRecoveredEditsOutputSink.class); 045 046 // Since the splitting process may create multiple output files, we need a map 047 // to track the output count of each region. 048 private ConcurrentMap<String, Long> regionEditsWrittenMap = new ConcurrentHashMap<>(); 049 // Need a counter to track the opening writers. 050 private final AtomicInteger openingWritersNum = new AtomicInteger(0); 051 052 public BoundedRecoveredEditsOutputSink(WALSplitter walSplitter, 053 WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { 054 super(walSplitter, controller, entryBuffers, numWriters); 055 } 056 057 @Override 058 public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException { 059 List<WAL.Entry> entries = buffer.entries; 060 if (entries.isEmpty()) { 061 LOG.warn("got an empty buffer, skipping"); 062 return; 063 } 064 // The key point is create a new writer, write edits then close writer. 065 RecoveredEditsWriter writer = createRecoveredEditsWriter(buffer.tableName, 066 buffer.encodedRegionName, entries.get(0).getKey().getSequenceId()); 067 if (writer != null) { 068 openingWritersNum.incrementAndGet(); 069 writer.writeRegionEntries(entries); 070 regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName), 071 (k, v) -> v == null ? writer.editsWritten : v + writer.editsWritten); 072 List<IOException> thrown = new ArrayList<>(); 073 Path dst = closeRecoveredEditsWriter(writer, thrown); 074 splits.add(dst); 075 openingWritersNum.decrementAndGet(); 076 if (!thrown.isEmpty()) { 077 throw MultipleIOException.createIOException(thrown); 078 } 079 } 080 } 081 082 @Override 083 public List<Path> close() throws IOException { 084 boolean isSuccessful = true; 085 try { 086 isSuccessful = finishWriterThreads(false); 087 } finally { 088 isSuccessful &= writeRemainingEntryBuffers(); 089 } 090 return isSuccessful ? splits : null; 091 } 092 093 /** 094 * Write out the remaining RegionEntryBuffers and close the writers. 095 * @return true when there is no error. 096 */ 097 private boolean writeRemainingEntryBuffers() throws IOException { 098 for (EntryBuffers.RegionEntryBuffer buffer : entryBuffers.buffers.values()) { 099 closeCompletionService.submit(() -> { 100 append(buffer); 101 return null; 102 }); 103 } 104 boolean progressFailed = false; 105 try { 106 for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) { 107 Future<Void> future = closeCompletionService.take(); 108 future.get(); 109 if (!progressFailed && reporter != null && !reporter.progress()) { 110 progressFailed = true; 111 } 112 } 113 } catch (InterruptedException e) { 114 IOException iie = new InterruptedIOException(); 115 iie.initCause(e); 116 throw iie; 117 } catch (ExecutionException e) { 118 throw new IOException(e.getCause()); 119 } finally { 120 closeThreadPool.shutdownNow(); 121 } 122 return !progressFailed; 123 } 124 125 @Override 126 public Map<String, Long> getOutputCounts() { 127 return regionEditsWrittenMap; 128 } 129 130 @Override 131 public int getNumberOfRecoveredRegions() { 132 return regionEditsWrittenMap.size(); 133 } 134 135 @Override 136 public int getNumOpenWriters() { 137 return openingWritersNum.get(); 138 } 139}