001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.hbtop.mode;
019
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import java.util.stream.Collectors;
029import org.apache.hadoop.hbase.ClusterMetrics;
030import org.apache.hadoop.hbase.ServerMetrics;
031import org.apache.hadoop.hbase.UserMetrics;
032import org.apache.hadoop.hbase.hbtop.Record;
033import org.apache.hadoop.hbase.hbtop.RecordFilter;
034import org.apache.hadoop.hbase.hbtop.field.Field;
035import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
036import org.apache.hadoop.hbase.hbtop.field.FieldValue;
037import org.apache.hadoop.hbase.hbtop.field.FieldValueType;
038import org.apache.yetus.audience.InterfaceAudience;
039
040/**
041 * Implementation for {@link ModeStrategy} for client Mode.
042 */
043@InterfaceAudience.Private
044public final class ClientModeStrategy implements ModeStrategy {
045
046  private final List<FieldInfo> fieldInfos =
047    Arrays.asList(new FieldInfo(Field.CLIENT, 0, true), new FieldInfo(Field.USER_COUNT, 5, true),
048      new FieldInfo(Field.REQUEST_COUNT_PER_SECOND, 10, true),
049      new FieldInfo(Field.READ_REQUEST_COUNT_PER_SECOND, 10, true),
050      new FieldInfo(Field.WRITE_REQUEST_COUNT_PER_SECOND, 10, true),
051      new FieldInfo(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND, 10, true));
052  private final Map<String, RequestCountPerSecond> requestCountPerSecondMap = new HashMap<>();
053
054  ClientModeStrategy() {
055  }
056
057  @Override
058  public List<FieldInfo> getFieldInfos() {
059    return fieldInfos;
060  }
061
062  @Override
063  public Field getDefaultSortField() {
064    return Field.REQUEST_COUNT_PER_SECOND;
065  }
066
067  @Override
068  public List<Record> getRecords(ClusterMetrics clusterMetrics,
069    List<RecordFilter> pushDownFilters) {
070    List<Record> records = createRecords(clusterMetrics);
071    return aggregateRecordsAndAddDistinct(
072      ModeStrategyUtils.applyFilterAndGet(records, pushDownFilters), Field.CLIENT, Field.USER,
073      Field.USER_COUNT);
074  }
075
076  List<Record> createRecords(ClusterMetrics clusterMetrics) {
077    List<Record> ret = new ArrayList<>();
078    for (ServerMetrics serverMetrics : clusterMetrics.getLiveServerMetrics().values()) {
079      long lastReportTimestamp = serverMetrics.getLastReportTimestamp();
080      serverMetrics.getUserMetrics().values()
081        .forEach(um -> um.getClientMetrics().values()
082          .forEach(clientMetrics -> ret.add(createRecord(um.getNameAsString(), clientMetrics,
083            lastReportTimestamp, serverMetrics.getServerName().getServerName()))));
084    }
085    return ret;
086  }
087
088  /**
089   * Aggregate the records and count the unique values for the given distinctField
090   * @param records               records to be processed
091   * @param groupBy               Field on which group by needs to be done
092   * @param distinctField         Field whose unique values needs to be counted
093   * @param uniqueCountAssignedTo a target field to which the unique count is assigned to
094   * @return aggregated records
095   */
096  List<Record> aggregateRecordsAndAddDistinct(List<Record> records, Field groupBy,
097    Field distinctField, Field uniqueCountAssignedTo) {
098    List<Record> result = new ArrayList<>();
099    records.stream().collect(Collectors.groupingBy(r -> r.get(groupBy))).values().forEach(val -> {
100      Set<FieldValue> distinctValues = new HashSet<>();
101      Map<Field, FieldValue> map = new HashMap<>();
102      for (Record record : val) {
103        for (Map.Entry<Field, FieldValue> field : record.entrySet()) {
104          if (distinctField.equals(field.getKey())) {
105            // We will not be adding the field in the new record whose distinct count is required
106            distinctValues.add(record.get(distinctField));
107          } else {
108            if (field.getKey().getFieldValueType() == FieldValueType.STRING) {
109              map.put(field.getKey(), field.getValue());
110            } else {
111              if (map.get(field.getKey()) == null) {
112                map.put(field.getKey(), field.getValue());
113              } else {
114                map.put(field.getKey(), map.get(field.getKey()).plus(field.getValue()));
115              }
116            }
117          }
118        }
119      }
120      // Add unique count field
121      map.put(uniqueCountAssignedTo, uniqueCountAssignedTo.newValue(distinctValues.size()));
122      result.add(
123        Record.ofEntries(map.entrySet().stream().map(k -> Record.entry(k.getKey(), k.getValue()))));
124    });
125    return result;
126  }
127
128  Record createRecord(String user, UserMetrics.ClientMetrics clientMetrics,
129    long lastReportTimestamp, String server) {
130    Record.Builder builder = Record.builder();
131    String client = clientMetrics.getHostName();
132    builder.put(Field.CLIENT, clientMetrics.getHostName());
133    String mapKey = client + "$" + user + "$" + server;
134    RequestCountPerSecond requestCountPerSecond = requestCountPerSecondMap.get(mapKey);
135    if (requestCountPerSecond == null) {
136      requestCountPerSecond = new RequestCountPerSecond();
137      requestCountPerSecondMap.put(mapKey, requestCountPerSecond);
138    }
139    requestCountPerSecond.refresh(lastReportTimestamp, clientMetrics.getReadRequestsCount(),
140      clientMetrics.getFilteredReadRequestsCount(), clientMetrics.getWriteRequestsCount());
141    builder.put(Field.REQUEST_COUNT_PER_SECOND, requestCountPerSecond.getRequestCountPerSecond());
142    builder.put(Field.READ_REQUEST_COUNT_PER_SECOND,
143      requestCountPerSecond.getReadRequestCountPerSecond());
144    builder.put(Field.WRITE_REQUEST_COUNT_PER_SECOND,
145      requestCountPerSecond.getWriteRequestCountPerSecond());
146    builder.put(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND,
147      requestCountPerSecond.getFilteredReadRequestCountPerSecond());
148    builder.put(Field.USER, user);
149    return builder.build();
150  }
151
152  @Override
153  public DrillDownInfo drillDown(Record selectedRecord) {
154    List<RecordFilter> initialFilters = Collections.singletonList(
155      RecordFilter.newBuilder(Field.CLIENT).doubleEquals(selectedRecord.get(Field.CLIENT)));
156    return new DrillDownInfo(Mode.USER, initialFilters);
157  }
158}