001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertSame;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.util.Optional;
029import java.util.concurrent.CountDownLatch;
030import org.apache.hadoop.hbase.Cell.Type;
031import org.apache.hadoop.hbase.CellBuilderFactory;
032import org.apache.hadoop.hbase.CellBuilderType;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
041import org.apache.hadoop.hbase.coprocessor.ObserverContext;
042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
044import org.apache.hadoop.hbase.coprocessor.RegionObserver;
045import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.junit.After;
049import org.junit.AfterClass;
050import org.junit.Before;
051import org.junit.BeforeClass;
052import org.junit.ClassRule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055
056/**
057 * Confirm that the function of FlushLifeCycleTracker is OK as we do not use it in our own code.
058 */
059@Category({ CoprocessorTests.class, MediumTests.class })
060public class TestFlushLifeCycleTracker {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestFlushLifeCycleTracker.class);
065
066  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
067
068  private static final TableName NAME =
069    TableName.valueOf(TestFlushLifeCycleTracker.class.getSimpleName());
070
071  private static final byte[] CF = Bytes.toBytes("CF");
072
073  private static final byte[] QUALIFIER = Bytes.toBytes("CQ");
074
075  private HRegion region;
076
077  private static FlushLifeCycleTracker TRACKER;
078
079  private static volatile CountDownLatch ARRIVE;
080
081  private static volatile CountDownLatch BLOCK;
082
083  public static final class FlushObserver implements RegionObserver, RegionCoprocessor {
084
085    @Override
086    public Optional<RegionObserver> getRegionObserver() {
087      return Optional.of(this);
088    }
089
090    @Override
091    public void preFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c,
092      FlushLifeCycleTracker tracker) throws IOException {
093      if (TRACKER != null) {
094        assertSame(tracker, TRACKER);
095      }
096    }
097
098    @Override
099    public InternalScanner preFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c,
100      Store store, InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
101      if (TRACKER != null) {
102        assertSame(tracker, TRACKER);
103      }
104      return scanner;
105    }
106
107    @Override
108    public void postFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c,
109      FlushLifeCycleTracker tracker) throws IOException {
110      if (TRACKER != null) {
111        assertSame(tracker, TRACKER);
112      }
113    }
114
115    @Override
116    public void postFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c, Store store,
117      StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException {
118      if (TRACKER != null) {
119        assertSame(tracker, TRACKER);
120      }
121      // inject here so we can make a flush request to fail because of we already have a flush
122      // ongoing.
123      CountDownLatch arrive = ARRIVE;
124      if (arrive != null) {
125        arrive.countDown();
126        try {
127          BLOCK.await();
128        } catch (InterruptedException e) {
129          throw new InterruptedIOException();
130        }
131      }
132    }
133  }
134
135  private static final class Tracker implements FlushLifeCycleTracker {
136
137    private String reason;
138
139    private boolean beforeExecutionCalled;
140
141    private boolean afterExecutionCalled;
142
143    private boolean completed = false;
144
145    @Override
146    public synchronized void notExecuted(String reason) {
147      this.reason = reason;
148      completed = true;
149      notifyAll();
150    }
151
152    @Override
153    public void beforeExecution() {
154      this.beforeExecutionCalled = true;
155    }
156
157    @Override
158    public synchronized void afterExecution() {
159      this.afterExecutionCalled = true;
160      completed = true;
161      notifyAll();
162    }
163
164    public synchronized void await() throws InterruptedException {
165      while (!completed) {
166        wait();
167      }
168    }
169  }
170
171  @BeforeClass
172  public static void setUpBeforeClass() throws Exception {
173    UTIL.startMiniCluster(3);
174  }
175
176  @AfterClass
177  public static void tearDownAfterClass() throws Exception {
178    UTIL.shutdownMiniCluster();
179  }
180
181  @Before
182  public void setUp() throws IOException {
183    UTIL.getAdmin()
184      .createTable(TableDescriptorBuilder.newBuilder(NAME)
185        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF))
186        .setCoprocessor(FlushObserver.class.getName()).build());
187    region = UTIL.getHBaseCluster().getRegions(NAME).get(0);
188  }
189
190  @After
191  public void tearDown() throws IOException {
192    region = null;
193    TRACKER = null;
194    UTIL.deleteTable(NAME);
195  }
196
197  @Test
198  public void test() throws IOException, InterruptedException {
199    try (Table table = UTIL.getConnection().getTable(NAME)) {
200      for (int i = 0; i < 100; i++) {
201        byte[] row = Bytes.toBytes(i);
202        table.put(
203          new Put(row, true).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row)
204            .setFamily(CF).setQualifier(QUALIFIER).setTimestamp(HConstants.LATEST_TIMESTAMP)
205            .setType(Type.Put).setValue(Bytes.toBytes(i)).build()));
206      }
207    }
208    Tracker tracker = new Tracker();
209    TRACKER = tracker;
210    region.requestFlush(tracker);
211    tracker.await();
212    assertNull(tracker.reason);
213    assertTrue(tracker.beforeExecutionCalled);
214    assertTrue(tracker.afterExecutionCalled);
215
216    // request flush on a region with empty memstore should still success
217    tracker = new Tracker();
218    TRACKER = tracker;
219    region.requestFlush(tracker);
220    tracker.await();
221    assertNull(tracker.reason);
222    assertTrue(tracker.beforeExecutionCalled);
223    assertTrue(tracker.afterExecutionCalled);
224  }
225
226  @Test
227  public void testNotExecuted() throws IOException, InterruptedException {
228    try (Table table = UTIL.getConnection().getTable(NAME)) {
229      for (int i = 0; i < 100; i++) {
230        byte[] row = Bytes.toBytes(i);
231        table.put(
232          new Put(row, true).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row)
233            .setFamily(CF).setQualifier(QUALIFIER).setTimestamp(HConstants.LATEST_TIMESTAMP)
234            .setType(Type.Put).setValue(Bytes.toBytes(i)).build()));
235      }
236    }
237    // here we may have overlap when calling the CP hooks so we do not assert on TRACKER
238    Tracker tracker1 = new Tracker();
239    ARRIVE = new CountDownLatch(1);
240    BLOCK = new CountDownLatch(1);
241    region.requestFlush(tracker1);
242    ARRIVE.await();
243
244    Tracker tracker2 = new Tracker();
245    region.requestFlush(tracker2);
246    tracker2.await();
247    assertNotNull(tracker2.reason);
248    assertFalse(tracker2.beforeExecutionCalled);
249    assertFalse(tracker2.afterExecutionCalled);
250
251    BLOCK.countDown();
252    tracker1.await();
253    assertNull(tracker1.reason);
254    assertTrue(tracker1.beforeExecutionCalled);
255    assertTrue(tracker1.afterExecutionCalled);
256  }
257}