001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.Future; 031import java.util.concurrent.atomic.AtomicInteger; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellComparatorImpl; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.MetaCellComparator; 037import org.apache.hadoop.hbase.PrivateCellUtil; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.io.hfile.CacheConfig; 040import org.apache.hadoop.hbase.io.hfile.HFileContext; 041import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 042import org.apache.hadoop.hbase.regionserver.CellSet; 043import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 044import org.apache.hadoop.hbase.regionserver.StoreUtils; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer; 047import org.apache.hadoop.hbase.wal.WAL.Entry; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * A WALSplitter sink that outputs {@link org.apache.hadoop.hbase.io.hfile.HFile}s. Runs with a 054 * bounded number of HFile writers at any one time rather than let the count run up. 055 * @see BoundedRecoveredEditsOutputSink for a sink implementation that writes intermediate 056 * recovered.edits files. 057 */ 058@InterfaceAudience.Private 059public class BoundedRecoveredHFilesOutputSink extends OutputSink { 060 private static final Logger LOG = LoggerFactory.getLogger(BoundedRecoveredHFilesOutputSink.class); 061 062 private final WALSplitter walSplitter; 063 064 // Since the splitting process may create multiple output files, we need a map 065 // to track the output count of each region. 066 private ConcurrentMap<String, Long> regionEditsWrittenMap = new ConcurrentHashMap<>(); 067 // Need a counter to track the opening writers. 068 private final AtomicInteger openingWritersNum = new AtomicInteger(0); 069 070 public BoundedRecoveredHFilesOutputSink(WALSplitter walSplitter, 071 WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { 072 super(controller, entryBuffers, numWriters); 073 this.walSplitter = walSplitter; 074 } 075 076 @Override 077 public void append(RegionEntryBuffer buffer) throws IOException { 078 Map<String, CellSet> familyCells = new HashMap<>(); 079 Map<String, Long> familySeqIds = new HashMap<>(); 080 boolean isMetaTable = buffer.tableName.equals(META_TABLE_NAME); 081 // First iterate all Cells to find which column families are present and to stamp Cell with 082 // sequence id. 083 for (WAL.Entry entry : buffer.entries) { 084 long seqId = entry.getKey().getSequenceId(); 085 List<Cell> cells = entry.getEdit().getCells(); 086 for (Cell cell : cells) { 087 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { 088 continue; 089 } 090 PrivateCellUtil.setSequenceId(cell, seqId); 091 String familyName = Bytes.toString(CellUtil.cloneFamily(cell)); 092 // comparator need to be specified for meta 093 familyCells 094 .computeIfAbsent(familyName, 095 key -> new CellSet( 096 isMetaTable ? MetaCellComparator.META_COMPARATOR : CellComparatorImpl.COMPARATOR)) 097 .add(cell); 098 familySeqIds.compute(familyName, (k, v) -> v == null ? seqId : Math.max(v, seqId)); 099 } 100 } 101 102 // Create a new hfile writer for each column family, write edits then close writer. 103 String regionName = Bytes.toString(buffer.encodedRegionName); 104 for (Map.Entry<String, CellSet> cellsEntry : familyCells.entrySet()) { 105 String familyName = cellsEntry.getKey(); 106 StoreFileWriter writer = createRecoveredHFileWriter(buffer.tableName, regionName, 107 familySeqIds.get(familyName), familyName, isMetaTable); 108 LOG.trace("Created {}", writer.getPath()); 109 openingWritersNum.incrementAndGet(); 110 try { 111 for (Cell cell : cellsEntry.getValue()) { 112 writer.append(cell); 113 } 114 // Append the max seqid to hfile, used when recovery. 115 writer.appendMetadata(familySeqIds.get(familyName), false); 116 regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName), 117 (k, v) -> v == null ? buffer.entries.size() : v + buffer.entries.size()); 118 splits.add(writer.getPath()); 119 openingWritersNum.decrementAndGet(); 120 } finally { 121 writer.close(); 122 LOG.trace("Closed {}, edits={}", writer.getPath(), familyCells.size()); 123 } 124 } 125 } 126 127 @Override 128 public List<Path> close() throws IOException { 129 boolean isSuccessful = true; 130 try { 131 isSuccessful = finishWriterThreads(false); 132 } finally { 133 isSuccessful &= writeRemainingEntryBuffers(); 134 } 135 return isSuccessful ? splits : null; 136 } 137 138 /** 139 * Write out the remaining RegionEntryBuffers and close the writers. 140 * @return true when there is no error. 141 */ 142 private boolean writeRemainingEntryBuffers() throws IOException { 143 for (EntryBuffers.RegionEntryBuffer buffer : entryBuffers.buffers.values()) { 144 closeCompletionService.submit(() -> { 145 append(buffer); 146 return null; 147 }); 148 } 149 boolean progressFailed = false; 150 try { 151 for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) { 152 Future<Void> future = closeCompletionService.take(); 153 future.get(); 154 if (!progressFailed && reporter != null && !reporter.progress()) { 155 progressFailed = true; 156 } 157 } 158 } catch (InterruptedException e) { 159 IOException iie = new InterruptedIOException(); 160 iie.initCause(e); 161 throw iie; 162 } catch (ExecutionException e) { 163 throw new IOException(e.getCause()); 164 } finally { 165 closeThreadPool.shutdownNow(); 166 } 167 return !progressFailed; 168 } 169 170 @Override 171 public Map<String, Long> getOutputCounts() { 172 return regionEditsWrittenMap; 173 } 174 175 @Override 176 public int getNumberOfRecoveredRegions() { 177 return regionEditsWrittenMap.size(); 178 } 179 180 @Override 181 public int getNumOpenWriters() { 182 return openingWritersNum.get(); 183 } 184 185 @Override 186 public boolean keepRegionEvent(Entry entry) { 187 return false; 188 } 189 190 /** 191 * @return Returns a base HFile without compressions or encodings; good enough for recovery given 192 * hfile has metadata on how it was written. 193 */ 194 private StoreFileWriter createRecoveredHFileWriter(TableName tableName, String regionName, 195 long seqId, String familyName, boolean isMetaTable) throws IOException { 196 Path outputDir = WALSplitUtil.tryCreateRecoveredHFilesDir(walSplitter.rootFS, walSplitter.conf, 197 tableName, regionName, familyName); 198 StoreFileWriter.Builder writerBuilder = 199 new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, walSplitter.rootFS) 200 .withOutputDir(outputDir); 201 HFileContext hFileContext = 202 new HFileContextBuilder().withChecksumType(StoreUtils.getChecksumType(walSplitter.conf)) 203 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(walSplitter.conf)).withCellComparator( 204 isMetaTable ? MetaCellComparator.META_COMPARATOR : CellComparatorImpl.COMPARATOR) 205 .build(); 206 return writerBuilder.withFileContext(hFileContext).build(); 207 } 208}