001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.security.PrivilegedExceptionAction;
027import java.util.Arrays;
028import java.util.List;
029import java.util.Map;
030import java.util.NavigableMap;
031import java.util.TreeMap;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.CellUtil;
037import org.apache.hadoop.hbase.Coprocessor;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RegionInfoBuilder;
049import org.apache.hadoop.hbase.client.TableDescriptor;
050import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
051import org.apache.hadoop.hbase.regionserver.HRegion;
052import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
053import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
054import org.apache.hadoop.hbase.security.User;
055import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
056import org.apache.hadoop.hbase.testclassification.MediumTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.CommonFSUtils;
059import org.apache.hadoop.hbase.util.EnvironmentEdge;
060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
061import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
062import org.apache.hadoop.hbase.wal.WAL;
063import org.apache.hadoop.hbase.wal.WALEdit;
064import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
065import org.apache.hadoop.hbase.wal.WALFactory;
066import org.apache.hadoop.hbase.wal.WALKeyImpl;
067import org.apache.hadoop.hbase.wal.WALSplitter;
068import org.junit.After;
069import org.junit.AfterClass;
070import org.junit.Before;
071import org.junit.BeforeClass;
072import org.junit.ClassRule;
073import org.junit.Rule;
074import org.junit.Test;
075import org.junit.experimental.categories.Category;
076import org.junit.rules.TestName;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080/**
081 * Tests invocation of the {@link org.apache.hadoop.hbase.coprocessor.MasterObserver} interface
082 * hooks at all appropriate times during normal HMaster operations.
083 */
084@Category({ CoprocessorTests.class, MediumTests.class })
085public class TestWALObserver {
086
087  @ClassRule
088  public static final HBaseClassTestRule CLASS_RULE =
089    HBaseClassTestRule.forClass(TestWALObserver.class);
090
091  private static final Logger LOG = LoggerFactory.getLogger(TestWALObserver.class);
092  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
093
094  private static byte[] TEST_TABLE = Bytes.toBytes("observedTable");
095  private static byte[][] TEST_FAMILY =
096    { Bytes.toBytes("fam1"), Bytes.toBytes("fam2"), Bytes.toBytes("fam3"), };
097  private static byte[][] TEST_QUALIFIER =
098    { Bytes.toBytes("q1"), Bytes.toBytes("q2"), Bytes.toBytes("q3"), };
099  private static byte[][] TEST_VALUE =
100    { Bytes.toBytes("v1"), Bytes.toBytes("v2"), Bytes.toBytes("v3"), };
101  private static byte[] TEST_ROW = Bytes.toBytes("testRow");
102
103  @Rule
104  public TestName currentTest = new TestName();
105
106  private Configuration conf;
107  private FileSystem fs;
108  private Path hbaseRootDir;
109  private Path hbaseWALRootDir;
110  private Path oldLogDir;
111  private Path logDir;
112  private WALFactory wals;
113
114  @BeforeClass
115  public static void setupBeforeClass() throws Exception {
116    Configuration conf = TEST_UTIL.getConfiguration();
117    conf.setStrings(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
118      SampleRegionWALCoprocessor.class.getName());
119    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
120      SampleRegionWALCoprocessor.class.getName());
121    conf.setInt("dfs.client.block.recovery.retries", 2);
122
123    TEST_UTIL.startMiniCluster(1);
124    Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
125    Path hbaseWALRootDir =
126      TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbaseLogRoot"));
127    LOG.info("hbase.rootdir=" + hbaseRootDir);
128    CommonFSUtils.setRootDir(conf, hbaseRootDir);
129    CommonFSUtils.setWALRootDir(conf, hbaseWALRootDir);
130  }
131
132  @AfterClass
133  public static void teardownAfterClass() throws Exception {
134    TEST_UTIL.shutdownMiniCluster();
135  }
136
137  @Before
138  public void setUp() throws Exception {
139    this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
140    // this.cluster = TEST_UTIL.getDFSCluster();
141    this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
142    this.hbaseRootDir = CommonFSUtils.getRootDir(conf);
143    this.hbaseWALRootDir = CommonFSUtils.getWALRootDir(conf);
144    this.oldLogDir = new Path(this.hbaseWALRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
145    String serverName = ServerName
146      .valueOf(currentTest.getMethodName(), 16010, EnvironmentEdgeManager.currentTime()).toString();
147    this.logDir =
148      new Path(this.hbaseWALRootDir, AbstractFSWALProvider.getWALDirectoryName(serverName));
149
150    if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
151      TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
152    }
153    if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseWALRootDir)) {
154      TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
155    }
156    this.wals = new WALFactory(conf, serverName);
157  }
158
159  @After
160  public void tearDown() throws Exception {
161    try {
162      wals.shutdown();
163    } catch (IOException exception) {
164      // one of our tests splits out from under our wals.
165      LOG.warn("Ignoring failure to close wal factory. " + exception.getMessage());
166      LOG.debug("details of failure to close wal factory.", exception);
167    }
168    TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
169    TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
170  }
171
172  /**
173   * Test WAL write behavior with WALObserver. The coprocessor monitors a WALEdit written to WAL,
174   * and ignore, modify, and add KeyValue's for the WALEdit.
175   */
176  @Test
177  public void testWALObserverWriteToWAL() throws Exception {
178    final WAL log = wals.getWAL(null);
179    verifyWritesSeen(log, getCoprocessor(log, SampleRegionWALCoprocessor.class), false);
180  }
181
182  private void verifyWritesSeen(final WAL log, final SampleRegionWALCoprocessor cp,
183    final boolean seesLegacy) throws Exception {
184    RegionInfo hri = createBasicHRegionInfo(Bytes.toString(TEST_TABLE));
185    TableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
186    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
187    for (byte[] fam : htd.getColumnFamilyNames()) {
188      scopes.put(fam, 0);
189    }
190    Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
191    deleteDir(basedir);
192    fs.mkdirs(new Path(basedir, hri.getEncodedName()));
193
194    // TEST_FAMILY[0] shall be removed from WALEdit.
195    // TEST_FAMILY[1] value shall be changed.
196    // TEST_FAMILY[2] shall be added to WALEdit, although it's not in the put.
197    cp.setTestValues(TEST_TABLE, TEST_ROW, TEST_FAMILY[0], TEST_QUALIFIER[0], TEST_FAMILY[1],
198      TEST_QUALIFIER[1], TEST_FAMILY[2], TEST_QUALIFIER[2]);
199
200    assertFalse(cp.isPreWALWriteCalled());
201    assertFalse(cp.isPostWALWriteCalled());
202
203    // TEST_FAMILY[2] is not in the put, however it shall be added by the tested
204    // coprocessor.
205    // Use a Put to create familyMap.
206    Put p = creatPutWith2Families(TEST_ROW);
207
208    Map<byte[], List<Cell>> familyMap = p.getFamilyCellMap();
209    WALEdit edit = new WALEdit();
210    edit.add(familyMap);
211
212    boolean foundFamily0 = false;
213    boolean foundFamily2 = false;
214    boolean modifiedFamily1 = false;
215
216    List<Cell> cells = edit.getCells();
217
218    for (Cell cell : cells) {
219      if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[0])) {
220        foundFamily0 = true;
221      }
222      if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[2])) {
223        foundFamily2 = true;
224      }
225      if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[1])) {
226        if (!Arrays.equals(CellUtil.cloneValue(cell), TEST_VALUE[1])) {
227          modifiedFamily1 = true;
228        }
229      }
230    }
231    assertTrue(foundFamily0);
232    assertFalse(foundFamily2);
233    assertFalse(modifiedFamily1);
234
235    // it's where WAL write cp should occur.
236    long now = EnvironmentEdgeManager.currentTime();
237    // we use HLogKey here instead of WALKeyImpl directly to support legacy coprocessors.
238    long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now,
239      new MultiVersionConcurrencyControl(), scopes), edit);
240    log.sync(txid);
241
242    // the edit shall have been change now by the coprocessor.
243    foundFamily0 = false;
244    foundFamily2 = false;
245    modifiedFamily1 = false;
246    for (Cell cell : cells) {
247      if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[0])) {
248        foundFamily0 = true;
249      }
250      if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[2])) {
251        foundFamily2 = true;
252      }
253      if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[1])) {
254        if (!Arrays.equals(CellUtil.cloneValue(cell), TEST_VALUE[1])) {
255          modifiedFamily1 = true;
256        }
257      }
258    }
259    assertFalse(foundFamily0);
260    assertTrue(foundFamily2);
261    assertTrue(modifiedFamily1);
262
263    assertTrue(cp.isPreWALWriteCalled());
264    assertTrue(cp.isPostWALWriteCalled());
265  }
266
267  /**
268   * Coprocessors shouldn't get notice of empty waledits.
269   */
270  @Test
271  public void testEmptyWALEditAreNotSeen() throws Exception {
272    RegionInfo hri = createBasicHRegionInfo(Bytes.toString(TEST_TABLE));
273    TableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
274    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
275    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
276    for (byte[] fam : htd.getColumnFamilyNames()) {
277      scopes.put(fam, 0);
278    }
279    WAL log = wals.getWAL(null);
280    try {
281      SampleRegionWALCoprocessor cp = getCoprocessor(log, SampleRegionWALCoprocessor.class);
282
283      cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null);
284
285      assertFalse(cp.isPreWALWriteCalled());
286      assertFalse(cp.isPostWALWriteCalled());
287
288      final long now = EnvironmentEdgeManager.currentTime();
289      long txid = log.appendData(hri,
290        new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes),
291        new WALEdit());
292      log.sync(txid);
293
294      assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled());
295      assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPostWALWriteCalled());
296    } finally {
297      log.close();
298    }
299  }
300
301  /**
302   * Test WAL replay behavior with WALObserver.
303   */
304  @Test
305  public void testWALCoprocessorReplay() throws Exception {
306    // WAL replay is handled at HRegion::replayRecoveredEdits(), which is
307    // ultimately called by HRegion::initialize()
308    TableName tableName = TableName.valueOf(currentTest.getMethodName());
309    TableDescriptor htd = getBasic3FamilyHTableDescriptor(tableName);
310    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
311    // final HRegionInfo hri =
312    // createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
313    // final HRegionInfo hri1 =
314    // createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
315    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
316
317    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
318    deleteDir(basedir);
319    fs.mkdirs(new Path(basedir, hri.getEncodedName()));
320
321    final Configuration newConf = HBaseConfiguration.create(this.conf);
322
323    // WAL wal = new WAL(this.fs, this.dir, this.oldLogDir, this.conf);
324    WAL wal = wals.getWAL(null);
325    // Put p = creatPutWith2Families(TEST_ROW);
326    WALEdit edit = new WALEdit();
327    long now = EnvironmentEdgeManager.currentTime();
328    final int countPerFamily = 1000;
329    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
330    for (byte[] fam : htd.getColumnFamilyNames()) {
331      scopes.put(fam, 0);
332    }
333    for (byte[] fam : htd.getColumnFamilyNames()) {
334      addWALEdits(tableName, hri, TEST_ROW, fam, countPerFamily,
335        EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc);
336    }
337    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
338      edit);
339    // sync to fs.
340    wal.sync();
341
342    User user = HBaseTestingUtil.getDifferentUser(newConf, ".replay.wal.secondtime");
343    user.runAs(new PrivilegedExceptionAction<Void>() {
344      @Override
345      public Void run() throws Exception {
346        Path p = runWALSplit(newConf);
347        LOG.info("WALSplit path == " + p);
348        // Make a new wal for new region open.
349        final WALFactory wals2 = new WALFactory(conf,
350          ServerName
351            .valueOf(currentTest.getMethodName() + "2", 16010, EnvironmentEdgeManager.currentTime())
352            .toString());
353        WAL wal2 = wals2.getWAL(null);
354        HRegion region = HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri,
355          htd, wal2, TEST_UTIL.getHBaseCluster().getRegionServer(0), null);
356
357        SampleRegionWALCoprocessor cp2 =
358          region.getCoprocessorHost().findCoprocessor(SampleRegionWALCoprocessor.class);
359        assertNotNull(cp2);
360        assertTrue(cp2.isPreReplayWALsCalled());
361        assertTrue(cp2.isPostReplayWALsCalled());
362        region.close();
363        wals2.close();
364        return null;
365      }
366    });
367  }
368
369  /**
370   * Test to see CP loaded successfully or not. There is a duplication at TestHLog, but the purpose
371   * of that one is to see whether the loaded CP will impact existing WAL tests or not.
372   */
373  @Test
374  public void testWALObserverLoaded() throws Exception {
375    WAL log = wals.getWAL(null);
376    assertNotNull(getCoprocessor(log, SampleRegionWALCoprocessor.class));
377  }
378
379  @Test
380  public void testWALObserverRoll() throws Exception {
381    final WAL wal = wals.getWAL(null);
382    final SampleRegionWALCoprocessor cp = getCoprocessor(wal, SampleRegionWALCoprocessor.class);
383    cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null);
384
385    assertFalse(cp.isPreWALRollCalled());
386    assertFalse(cp.isPostWALRollCalled());
387
388    wal.rollWriter(true);
389    assertTrue(cp.isPreWALRollCalled());
390    assertTrue(cp.isPostWALRollCalled());
391  }
392
393  private SampleRegionWALCoprocessor getCoprocessor(WAL wal,
394    Class<? extends SampleRegionWALCoprocessor> clazz) throws Exception {
395    WALCoprocessorHost host = wal.getCoprocessorHost();
396    Coprocessor c = host.findCoprocessor(clazz.getName());
397    return (SampleRegionWALCoprocessor) c;
398  }
399
400  /**
401   * Creates an HRI around an HTD that has <code>tableName</code>.
402   * @param tableName Name of table to use.
403   */
404  private RegionInfo createBasicHRegionInfo(String tableName) {
405    return RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
406  }
407
408  /*
409   * @param p Directory to cleanup
410   */
411  private void deleteDir(final Path p) throws IOException {
412    if (this.fs.exists(p)) {
413      if (!this.fs.delete(p, true)) {
414        throw new IOException("Failed remove of " + p);
415      }
416    }
417  }
418
419  private Put creatPutWith2Families(byte[] row) throws IOException {
420    Put p = new Put(row);
421    for (int i = 0; i < TEST_FAMILY.length - 1; i++) {
422      p.addColumn(TEST_FAMILY[i], TEST_QUALIFIER[i], TEST_VALUE[i]);
423    }
424    return p;
425  }
426
427  private Path runWALSplit(final Configuration c) throws IOException {
428    List<Path> splits =
429      WALSplitter.split(hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
430    // Split should generate only 1 file since there's only 1 region
431    assertEquals(1, splits.size());
432    // Make sure the file exists
433    assertTrue(fs.exists(splits.get(0)));
434    LOG.info("Split file=" + splits.get(0));
435    return splits.get(0);
436  }
437
438  private void addWALEdits(final TableName tableName, final RegionInfo hri, final byte[] rowName,
439    final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
440    final NavigableMap<byte[], Integer> scopes, final MultiVersionConcurrencyControl mvcc)
441    throws IOException {
442    String familyStr = Bytes.toString(family);
443    long txid = -1;
444    for (int j = 0; j < count; j++) {
445      byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
446      byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
447      WALEdit edit = new WALEdit();
448      WALEditInternalHelper.addExtendedCell(edit,
449        new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
450      // uses WALKeyImpl instead of HLogKey on purpose. will only work for tests where we don't care
451      // about legacy coprocessors
452      txid = wal.appendData(hri,
453        new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, ee.currentTime(), mvcc), edit);
454    }
455    if (-1 != txid) {
456      wal.sync(txid);
457    }
458  }
459
460  private TableDescriptor getBasic3FamilyHTableDescriptor(TableName tableName) {
461    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
462    Arrays.stream(TEST_FAMILY).map(ColumnFamilyDescriptorBuilder::of)
463      .forEachOrdered(builder::setColumnFamily);
464    return builder.build();
465  }
466
467  private TableDescriptor createBasic3FamilyHTD(String tableName) {
468    return TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
469      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("a"))
470      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("b"))
471      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("c")).build();
472  }
473}