未验证 提交 bb590dab 编写于 作者: C chen~ 提交者: GitHub

Include event(s) to alarms. (#6888)

Co-authored-by: Nkezhenxu94 <kezhenxu94@apache.org>
Co-authored-by: wu-sheng's avatar吴晟 Wu Sheng <wu.sheng@foxmail.com>
上级 d14148d4
......@@ -45,6 +45,7 @@ Release Notes.
* Fix: NPE when configmap has no data.
* Fix: Dynamic Configuration key `slowTraceSegmentThreshold` not work
* Fix: `!=` is not supported in oal when parameters are numbers.
* Include events of the entity(s) in the alarm.
#### UI
* Add logo for kong plugin.
......
......@@ -26,6 +26,9 @@ import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
import java.util.List;
import java.util.Objects;
import static java.util.Objects.isNull;
import static org.apache.skywalking.apm.util.StringUtil.isBlank;
......@@ -53,6 +56,14 @@ public class EventQueryService implements Service {
return getDao().queryEvents(condition);
}
public Events queryEvents(final List<EventQueryCondition> conditions) throws Exception {
EventQueryCondition condition = conditions.stream().filter(c -> isBlank(c.getUuid()) && isDurationInvalid(c.getTime())).findFirst().orElse(null);
if (Objects.nonNull(condition)) {
throw new IllegalArgumentException("time field is required when uuid is absent.");
}
return getDao().queryEvents(conditions);
}
boolean isDurationInvalid(final Duration duration) {
return isNull(duration) || (isBlank(duration.getStart()) || isBlank(duration.getEnd()));
}
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.query.type;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
import org.apache.skywalking.oap.server.core.query.type.event.Event;
import java.util.ArrayList;
import java.util.List;
......@@ -33,7 +34,9 @@ public class AlarmMessage {
private String id;
private String message;
private Long startTime;
private transient String id1;
private final List<KeyValue> tags;
private List<Event> events = new ArrayList<>(2);
public AlarmMessage() {
tags = new ArrayList<>();
......
......@@ -58,5 +58,4 @@ public class Event {
this.parameters = map.entrySet().stream().map(e -> new KeyValue(e.getKey(), e.getValue())).collect(Collectors.toList());
}
}
}
......@@ -18,7 +18,10 @@
package org.apache.skywalking.oap.server.core.query.type.event;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
import org.apache.skywalking.oap.server.core.query.input.Duration;
......@@ -26,6 +29,9 @@ import static org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO
import static org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO.MAX_SIZE;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder(toBuilder = true)
public class EventQueryCondition {
private String uuid;
......
......@@ -19,10 +19,12 @@
package org.apache.skywalking.oap.server.core.query.type.event;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Source {
......
......@@ -22,9 +22,13 @@ import org.apache.skywalking.oap.server.core.query.type.event.EventQueryConditio
import org.apache.skywalking.oap.server.core.query.type.event.Events;
import org.apache.skywalking.oap.server.core.storage.DAO;
import java.util.List;
public interface IEventQueryDAO extends DAO {
int DEFAULT_SIZE = 20;
int MAX_SIZE = 100;
Events queryEvents(final EventQueryCondition condition) throws Exception;
Events queryEvents(final List<EventQueryCondition> conditionList) throws Exception;
}
......@@ -19,26 +19,43 @@
package org.apache.skywalking.oap.query.graphql.resolver;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
import org.apache.skywalking.oap.server.core.query.EventQueryService;
import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
import org.apache.skywalking.oap.server.core.query.type.AlarmTrend;
import org.apache.skywalking.oap.server.core.query.type.Alarms;
import org.apache.skywalking.oap.server.core.query.type.Pagination;
import org.apache.skywalking.oap.server.core.query.type.event.Event;
import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
import org.apache.skywalking.oap.server.core.query.type.event.Source;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isNotEmpty;
public class AlarmQuery implements GraphQLQueryResolver {
private final ModuleManager moduleManager;
private AlarmQueryService queryService;
private EventQueryService eventQueryService;
public AlarmQuery(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
......@@ -50,23 +67,125 @@ public class AlarmQuery implements GraphQLQueryResolver {
return queryService;
}
private EventQueryService getEventQueryService() {
if (eventQueryService == null) {
this.eventQueryService = moduleManager.find(CoreModule.NAME).provider().getService(EventQueryService.class);
}
return eventQueryService;
}
public AlarmTrend getAlarmTrend(final Duration duration) {
return new AlarmTrend();
}
public Alarms getAlarm(final Duration duration, final Scope scope, final String keyword,
final Pagination paging, final List<Tag> tags) throws IOException {
final Pagination paging, final List<Tag> tags) throws Exception {
Integer scopeId = null;
if (scope != null) {
scopeId = scope.getScopeId();
}
long startSecondTB = 0;
long endSecondTB = 0;
final EventQueryCondition.EventQueryConditionBuilder conditionPrototype = EventQueryCondition.builder().size(IEventQueryDAO.MAX_SIZE);
if (nonNull(duration)) {
startSecondTB = duration.getStartTimeBucketInSec();
endSecondTB = duration.getEndTimeBucketInSec();
conditionPrototype.time(duration);
}
return getQueryService().getAlarm(
Alarms alarms = getQueryService().getAlarm(
scopeId, keyword, paging, startSecondTB, endSecondTB, tags);
return findRelevantEvents(alarms, conditionPrototype);
}
private Alarms findRelevantEvents(
final Alarms alarms,
final EventQueryCondition.EventQueryConditionBuilder conditionPrototype
) throws Exception {
if (alarms.getTotal() < 1) {
return alarms;
}
final List<EventQueryCondition> allConditions =
alarms.getMsgs()
.stream()
.flatMap(m -> buildEventSources(m).stream().map(conditionPrototype::source))
.map(EventQueryCondition.EventQueryConditionBuilder::build)
.collect(Collectors.toList());
final List<Event> events = getEventQueryService().queryEvents(allConditions).getEvents();
final Map<String, List<Event>> eventsKeyedBySourceId =
events.stream()
.filter(it -> !isNullOrEmpty(buildSourceID(it)))
.collect(Collectors.groupingBy(this::buildSourceID));
alarms.getMsgs().forEach(a -> {
if (isNotEmpty(eventsKeyedBySourceId.get(a.getId()))) {
a.getEvents().addAll(eventsKeyedBySourceId.get(a.getId()));
}
if (isNotEmpty(eventsKeyedBySourceId.get(a.getId1()))) {
a.getEvents().addAll(eventsKeyedBySourceId.get(a.getId1()));
}
});
return alarms;
}
private List<Source> buildEventSources(AlarmMessage msg) {
final List<Source> sources = new ArrayList<>(2);
final Source.SourceBuilder sourcePrototype = Source.builder();
switch (msg.getScopeId()) {
case DefaultScopeDefine.SERVICE_RELATION:
final IDManager.ServiceID.ServiceIDDefinition destServiceIdDef = IDManager.ServiceID.analysisId(msg.getId1());
sources.add(sourcePrototype.service(destServiceIdDef.getName()).build());
// fall through
case DefaultScopeDefine.SERVICE:
final IDManager.ServiceID.ServiceIDDefinition sourceServiceIdDef = IDManager.ServiceID.analysisId(msg.getId());
sources.add(sourcePrototype.service(sourceServiceIdDef.getName()).build());
break;
case DefaultScopeDefine.SERVICE_INSTANCE_RELATION:
final IDManager.ServiceInstanceID.InstanceIDDefinition destInstanceIdDef = IDManager.ServiceInstanceID.analysisId(msg.getId1());
final String destServiceName = IDManager.ServiceID.analysisId(destInstanceIdDef.getServiceId()).getName();
sources.add(sourcePrototype.service(destServiceName).serviceInstance(destInstanceIdDef.getName()).build());
// fall through
case DefaultScopeDefine.SERVICE_INSTANCE:
final IDManager.ServiceInstanceID.InstanceIDDefinition sourceInstanceIdDef = IDManager.ServiceInstanceID.analysisId(msg.getId());
final String serviceName = IDManager.ServiceID.analysisId(sourceInstanceIdDef.getServiceId()).getName();
sources.add(sourcePrototype.serviceInstance(sourceInstanceIdDef.getName()).service(serviceName).build());
break;
case DefaultScopeDefine.ENDPOINT_RELATION:
final IDManager.EndpointID.EndpointIDDefinition destEndpointIDDef = IDManager.EndpointID.analysisId(msg.getId1());
final String destEndpointServiceName = IDManager.ServiceID.analysisId(destEndpointIDDef.getServiceId()).getName();
sources.add(sourcePrototype.service(destEndpointServiceName).build());
// fall through
case DefaultScopeDefine.ENDPOINT:
final IDManager.EndpointID.EndpointIDDefinition endpointIDDef = IDManager.EndpointID.analysisId(msg.getId());
final String endpointServiceName = IDManager.ServiceID.analysisId(endpointIDDef.getServiceId()).getName();
sources.add(sourcePrototype.service(endpointServiceName).build());
break;
}
return sources;
}
protected String buildSourceID(final Event event) {
final Source source = event.getSource();
if (isNull(source)) {
return "";
}
final String service = source.getService();
if (isNullOrEmpty(service)) {
return "";
}
final String instance = source.getServiceInstance();
if (isNullOrEmpty(instance)) {
return IDManager.ServiceID.buildId(service, true);
}
return IDManager.ServiceInstanceID.buildId(service, instance);
}
}
Subproject commit 0c2388ba18cfc1b1b103ddad71f9765bd21dff6e
Subproject commit 84c635180b8dde4210865f655b83d101c68fe741
......@@ -90,6 +90,7 @@ public class AlarmQueryEsDAO extends EsDAO implements IAlarmQueryDAO {
AlarmMessage message = new AlarmMessage();
message.setId(String.valueOf(alarmRecord.getId0()));
message.setId1(String.valueOf(alarmRecord.getId1()));
message.setMessage(alarmRecord.getAlarmMessage());
message.setStartTime(alarmRecord.getStartTime());
message.setScope(Scope.Finder.valueOf(alarmRecord.getScope()));
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
......@@ -53,26 +54,27 @@ public class ESEventQueryDAO extends EsDAO implements IEventQueryDAO {
@Override
public Events queryEvents(final EventQueryCondition condition) throws Exception {
final SearchSourceBuilder sourceBuilder = buildQuery(condition);
return getEventsResultByCurrentBuilder(sourceBuilder);
}
final SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(Event.INDEX_NAME), sourceBuilder);
@Override
public Events queryEvents(List<EventQueryCondition> conditionList) throws Exception {
final SearchSourceBuilder sourceBuilder = buildQuery(conditionList);
return getEventsResultByCurrentBuilder(sourceBuilder);
}
private Events getEventsResultByCurrentBuilder(final SearchSourceBuilder sourceBuilder) throws IOException {
final SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(Event.INDEX_NAME), sourceBuilder);
final Events events = new Events();
events.setTotal((int) response.getHits().totalHits);
events.setEvents(Stream.of(response.getHits().getHits())
.map(this::parseSearchHit)
.collect(Collectors.toList()));
.map(this::parseSearchHit)
.collect(Collectors.toList()));
return events;
}
protected SearchSourceBuilder buildQuery(final EventQueryCondition condition) {
final SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
final BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
sourceBuilder.query(boolQueryBuilder);
final List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
private void buildMustQueryListByCondition(final EventQueryCondition condition, final List<QueryBuilder> mustQueryList) {
if (!isNullOrEmpty(condition.getUuid())) {
mustQueryList.add(QueryBuilders.termQuery(Event.UUID, condition.getUuid()));
}
......@@ -87,8 +89,8 @@ public class ESEventQueryDAO extends EsDAO implements IEventQueryDAO {
}
if (!isNullOrEmpty(source.getEndpoint())) {
mustQueryList.add(QueryBuilders.matchPhraseQuery(
MatchCNameBuilder.INSTANCE.build(Event.ENDPOINT),
source.getEndpoint()
MatchCNameBuilder.INSTANCE.build(Event.ENDPOINT),
source.getEndpoint()
));
}
}
......@@ -105,13 +107,40 @@ public class ESEventQueryDAO extends EsDAO implements IEventQueryDAO {
if (startTime != null) {
if (startTime.getStartTimestamp() > 0) {
mustQueryList.add(QueryBuilders.rangeQuery(Event.START_TIME)
.gt(startTime.getStartTimestamp()));
.gt(startTime.getStartTimestamp()));
}
if (startTime.getEndTimestamp() > 0) {
mustQueryList.add(QueryBuilders.rangeQuery(Event.END_TIME)
.lt(startTime.getEndTimestamp()));
.lt(startTime.getEndTimestamp()));
}
}
}
protected SearchSourceBuilder buildQuery(final List<EventQueryCondition> conditionList) {
final SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder linkShouldBuilder = QueryBuilders.boolQuery();
sourceBuilder.query(linkShouldBuilder);
conditionList.forEach(condition -> {
final BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
final List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
linkShouldBuilder.should(boolQueryBuilder);
buildMustQueryListByCondition(condition, mustQueryList);
});
EventQueryCondition condition = conditionList.get(0);
final Order queryOrder = isNull(condition.getOrder()) ? Order.DES : condition.getOrder();
sourceBuilder.sort(Event.START_TIME, Order.DES.equals(queryOrder) ? SortOrder.DESC : SortOrder.ASC);
sourceBuilder.size(condition.getSize());
return sourceBuilder;
}
protected SearchSourceBuilder buildQuery(final EventQueryCondition condition) {
final SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
final BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
sourceBuilder.query(boolQueryBuilder);
final List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
buildMustQueryListByCondition(condition, mustQueryList);
final Order queryOrder = isNull(condition.getOrder()) ? Order.DES : condition.getOrder();
sourceBuilder.sort(Event.START_TIME, Order.DES.equals(queryOrder) ? SortOrder.DESC : SortOrder.ASC);
......
......@@ -92,6 +92,7 @@ public class AlarmQueryEs7DAO extends EsDAO implements IAlarmQueryDAO {
AlarmMessage message = new AlarmMessage();
message.setId(String.valueOf(alarmRecord.getId0()));
message.setId1(String.valueOf(alarmRecord.getId1()));
message.setMessage(alarmRecord.getAlarmMessage());
message.setStartTime(alarmRecord.getStartTime());
message.setScope(Scope.Finder.valueOf(alarmRecord.getScope()));
......
......@@ -18,6 +18,8 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.skywalking.oap.server.core.event.Event;
......@@ -37,16 +39,24 @@ public class ES7EventQueryDAO extends ESEventQueryDAO {
@Override
public Events queryEvents(final EventQueryCondition condition) throws Exception {
final SearchSourceBuilder sourceBuilder = buildQuery(condition);
return getEventsResultByCurrentBuilder(sourceBuilder);
}
@Override
public Events queryEvents(List<EventQueryCondition> conditionList) throws Exception {
final SearchSourceBuilder sourceBuilder = buildQuery(conditionList);
return getEventsResultByCurrentBuilder(sourceBuilder);
}
private Events getEventsResultByCurrentBuilder(final SearchSourceBuilder sourceBuilder) throws IOException {
final SearchResponse response = getClient()
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(Event.INDEX_NAME), sourceBuilder);
.search(IndexController.LogicIndicesRegister.getPhysicalTableName(Event.INDEX_NAME), sourceBuilder);
final Events events = new Events();
events.setTotal(response.getHits().getTotalHits().value);
events.setEvents(Stream.of(response.getHits().getHits())
.map(this::parseSearchHit)
.collect(Collectors.toList()));
.map(this::parseSearchHit)
.collect(Collectors.toList()));
return events;
}
}
......@@ -19,9 +19,11 @@
package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.event.Event;
......@@ -38,6 +40,7 @@ import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereNested;
import org.influxdb.querybuilder.WhereQueryImpl;
import static com.google.common.base.Strings.isNullOrEmpty;
......@@ -55,35 +58,24 @@ public class EventQueryDAO implements IEventQueryDAO {
@Override
public Events queryEvents(final EventQueryCondition condition) throws Exception {
final WhereQueryImpl<SelectQueryImpl> recallQuery = buildQuery(condition);
List<WhereQueryImpl<SelectQueryImpl>> whereQueries = buildWhereQueries(condition);
final SelectQueryImpl countQuery = select().count(Event.UUID).from(client.getDatabase(), Event.INDEX_NAME);
recallQuery.getClauses().forEach(countQuery::where);
buildQueryByCondition(whereQueries, condition);
final Query query = new Query(countQuery.getCommand() + recallQuery.getCommand());
final List<QueryResult.Result> results = client.query(query);
if (log.isDebugEnabled()) {
log.debug("SQL: {}", query.getCommand());
log.debug("Result: {}", results);
}
if (results.size() != 2) {
throw new IOException("Expecting to get 2 Results, but it is " + results.size());
}
List<QueryResult.Result> results = execute(whereQueries.get(0), whereQueries.get(1));
final QueryResult.Series counterSeries = results.get(0).getSeries().get(0);
final List<QueryResult.Series> recallSeries = results.get(1).getSeries();
return buildEventsByQueryResult(results);
}
final Events events = new Events();
@Override
public Events queryEvents(List<EventQueryCondition> conditionList) throws Exception {
List<WhereQueryImpl<SelectQueryImpl>> whereQueries = buildWhereQueries(conditionList.get(0));
events.setTotal(((Number) counterSeries.getValues().get(0).get(1)).longValue());
buildQueryByCondition(whereQueries, conditionList);
recallSeries.forEach(
series -> series.getValues().forEach(
values -> events.getEvents().add(parseSeriesValues(series, values))
)
);
List<QueryResult.Result> results = execute(whereQueries.get(0), whereQueries.get(1));
return events;
return buildEventsByQueryResult(results);
}
protected org.apache.skywalking.oap.server.core.query.type.event.Event parseSeriesValues(final QueryResult.Series series, final List<Object> values) {
......@@ -116,49 +108,147 @@ public class EventQueryDAO implements IEventQueryDAO {
return event;
}
protected WhereQueryImpl<SelectQueryImpl> buildQuery(final EventQueryCondition condition) {
protected List<WhereQueryImpl<SelectQueryImpl>> buildWhereQueries(final EventQueryCondition condition) {
List<WhereQueryImpl<SelectQueryImpl>> queries = new ArrayList<>(2);
final String topFunc = Order.DES.equals(condition.getOrder()) ? InfluxConstants.SORT_DES : InfluxConstants.SORT_ASC;
final WhereQueryImpl<SelectQueryImpl> query =
select().raw(ALL_FIELDS)
.function(topFunc, Event.START_TIME, condition.getSize())
.from(client.getDatabase(), Event.INDEX_NAME)
.where();
final WhereQueryImpl<SelectQueryImpl> recallWhereQuery =
select().raw(ALL_FIELDS)
.function(topFunc, Event.START_TIME, condition.getSize())
.from(client.getDatabase(), Event.INDEX_NAME)
.where();
final SelectQueryImpl countQuery = select().count(Event.UUID).from(client.getDatabase(), Event.INDEX_NAME);
final WhereQueryImpl<SelectQueryImpl> countWhereQuery = countQuery.where();
queries.add(countWhereQuery);
queries.add(recallWhereQuery);
return queries;
}
protected void buildQueryByCondition(List<WhereQueryImpl<SelectQueryImpl>> queries, EventQueryCondition condition) {
WhereQueryImpl<SelectQueryImpl> countWhereQuery = queries.get(0);
WhereQueryImpl<SelectQueryImpl> recallWhereQuery = queries.get(1);
if (!isNullOrEmpty(condition.getUuid())) {
query.and(eq(Event.UUID, condition.getUuid()));
recallWhereQuery.and(eq(Event.UUID, condition.getUuid()));
countWhereQuery.and(eq(Event.UUID, condition.getUuid()));
}
final Source source = condition.getSource();
if (source != null) {
if (!isNullOrEmpty(source.getService())) {
query.and(eq(Event.SERVICE, source.getService()));
recallWhereQuery.and(eq(Event.SERVICE, source.getService()));
countWhereQuery.and(eq(Event.SERVICE, source.getService()));
}
if (!isNullOrEmpty(source.getServiceInstance())) {
query.and(eq(Event.SERVICE_INSTANCE, source.getServiceInstance()));
recallWhereQuery.and(eq(Event.SERVICE_INSTANCE, source.getServiceInstance()));
countWhereQuery.and(eq(Event.SERVICE_INSTANCE, source.getServiceInstance()));
}
if (!isNullOrEmpty(source.getEndpoint())) {
query.and(contains(Event.ENDPOINT, source.getEndpoint().replaceAll("/", "\\\\/")));
recallWhereQuery.and(contains(Event.ENDPOINT, source.getEndpoint().replaceAll("/", "\\\\/")));
countWhereQuery.and(contains(Event.ENDPOINT, source.getEndpoint().replaceAll("/", "\\\\/")));
}
}
if (!isNullOrEmpty(condition.getName())) {
query.and(eq(InfluxConstants.NAME, condition.getName()));
recallWhereQuery.and(eq(InfluxConstants.NAME, condition.getName()));
countWhereQuery.and(eq(InfluxConstants.NAME, condition.getName()));
}
if (condition.getType() != null) {
query.and(eq(Event.TYPE, condition.getType().name()));
recallWhereQuery.and(eq(Event.TYPE, condition.getType().name()));
countWhereQuery.and(eq(Event.TYPE, condition.getType().name()));
}
final Duration startTime = condition.getTime();
if (startTime != null) {
if (startTime.getStartTimestamp() > 0) {
query.and(gt(Event.START_TIME, startTime.getStartTimestamp()));
recallWhereQuery.and(gt(Event.START_TIME, startTime.getStartTimestamp()));
countWhereQuery.and(gt(Event.START_TIME, startTime.getStartTimestamp()));
}
if (startTime.getEndTimestamp() > 0) {
query.and(lt(Event.END_TIME, startTime.getEndTimestamp()));
recallWhereQuery.and(lt(Event.END_TIME, startTime.getEndTimestamp()));
countWhereQuery.and(lt(Event.END_TIME, startTime.getEndTimestamp()));
}
}
}
protected void buildQueryByCondition(List<WhereQueryImpl<SelectQueryImpl>> queries, List<EventQueryCondition> conditions) {
WhereQueryImpl<SelectQueryImpl> countWhereQuery = queries.get(0);
WhereQueryImpl<SelectQueryImpl> recallWhereQuery = queries.get(1);
conditions.stream().forEach(c -> {
WhereNested<WhereQueryImpl<SelectQueryImpl>> recallOrNested = recallWhereQuery.orNested();
WhereNested<WhereQueryImpl<SelectQueryImpl>> countOrNested = countWhereQuery.orNested();
// by current condition, we should not have uuid. If one day you need to use UUIDs as the query condition, this might be applied.
if (!isNullOrEmpty(c.getUuid())) {
recallWhereQuery.and(eq(Event.UUID, c.getUuid()));
countWhereQuery.and(eq(Event.UUID, c.getUuid()));
}
final Source source = c.getSource();
if (source != null) {
if (!isNullOrEmpty(source.getService())) {
recallOrNested.and(eq(Event.SERVICE, source.getService()));
countOrNested.and(eq(Event.SERVICE, source.getService()));
}
if (!isNullOrEmpty(source.getServiceInstance())) {
recallOrNested.and(eq(Event.SERVICE_INSTANCE, source.getServiceInstance()));
countOrNested.and(eq(Event.SERVICE_INSTANCE, source.getServiceInstance()));
}
if (!isNullOrEmpty(source.getEndpoint())) {
recallOrNested.and(contains(Event.ENDPOINT, source.getEndpoint().replaceAll("/", "\\\\/")));
countOrNested.and(contains(Event.ENDPOINT, source.getEndpoint().replaceAll("/", "\\\\/")));
}
}
if (!isNullOrEmpty(c.getName())) {
recallOrNested.and(eq(InfluxConstants.NAME, c.getName()));
countOrNested.and(eq(InfluxConstants.NAME, c.getName()));
}
if (c.getType() != null) {
recallOrNested.and(eq(Event.TYPE, c.getType().name()));
countOrNested.and(eq(Event.TYPE, c.getType().name()));
}
final Duration startTime = c.getTime();
if (startTime != null) {
if (startTime.getStartTimestamp() > 0) {
recallOrNested.and(gt(Event.START_TIME, startTime.getStartTimestamp()));
countOrNested.and(gt(Event.START_TIME, startTime.getStartTimestamp()));
}
if (startTime.getEndTimestamp() > 0) {
recallOrNested.and(lt(Event.END_TIME, startTime.getEndTimestamp()));
countOrNested.and(lt(Event.END_TIME, startTime.getEndTimestamp()));
}
}
recallOrNested.close();
countOrNested.close();
});
}
protected List<QueryResult.Result> execute(WhereQueryImpl<SelectQueryImpl> countWhereQuery, WhereQueryImpl<SelectQueryImpl> recallWhereQuery) throws IOException {
final Query query = new Query(countWhereQuery.getCommand() + recallWhereQuery.getCommand());
final List<QueryResult.Result> results = client.query(query);
if (log.isDebugEnabled()) {
log.debug("SQL: {}", query.getCommand());
log.debug("Result: {}", results);
}
if (results.size() != 2) {
throw new IOException("Expecting to get 2 Results, but it is " + results.size());
}
return results;
}
protected Events buildEventsByQueryResult(List<QueryResult.Result> results) {
final QueryResult.Series counterSeries = results.get(0).getSeries().get(0);
final List<QueryResult.Series> recallSeries = results.get(1).getSeries();
return query;
final Events events = new Events();
events.setTotal(((Number) counterSeries.getValues().get(0).get(1)).longValue());
recallSeries.forEach(
series -> series.getValues().forEach(
values -> events.getEvents().add(parseSeriesValues(series, values))
)
);
return events;
}
}
......@@ -128,6 +128,7 @@ public class H2AlarmQueryDAO implements IAlarmQueryDAO {
while (resultSet.next()) {
AlarmMessage message = new AlarmMessage();
message.setId(resultSet.getString(AlarmRecord.ID0));
message.setId1(resultSet.getString(AlarmRecord.ID1));
message.setMessage(resultSet.getString(AlarmRecord.ALARM_MESSAGE));
message.setStartTime(resultSet.getLong(AlarmRecord.START_TIME));
message.setScope(Scope.Finder.valueOf(resultSet.getInt(AlarmRecord.SCOPE)));
......
......@@ -18,20 +18,22 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.event.Event;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
import org.apache.skywalking.oap.server.core.query.type.event.EventType;
import org.apache.skywalking.oap.server.core.query.type.event.Events;
import org.apache.skywalking.oap.server.core.query.type.event.Source;
import org.apache.skywalking.oap.server.core.query.type.event.EventType;
import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
......@@ -44,79 +46,76 @@ public class H2EventQueryDAO implements IEventQueryDAO {
@Override
public Events queryEvents(final EventQueryCondition condition) throws Exception {
final List<String> conditions = new ArrayList<>();
final List<Object> parameters = new ArrayList<>();
final Tuple2<Stream<String>, Stream<Object>> conditionsParametersPair = buildQuery(condition);
final Stream<String> conditions = conditionsParametersPair._1();
final Object[] parameters = conditionsParametersPair._2().toArray();
final String whereClause = conditions.collect(Collectors.joining(" and ", " where ", ""));
if (!isNullOrEmpty(condition.getUuid())) {
conditions.add(Event.UUID + "=?");
parameters.add(condition.getUuid());
}
final Events result = new Events();
final Source source = condition.getSource();
if (source != null) {
if (!isNullOrEmpty(source.getService())) {
conditions.add(Event.SERVICE + "=?");
parameters.add(source.getService());
}
if (!isNullOrEmpty(source.getServiceInstance())) {
conditions.add(Event.SERVICE_INSTANCE + "=?");
parameters.add(source.getServiceInstance());
try (final Connection connection = client.getConnection()) {
String sql = "select count(1) total from " + Event.INDEX_NAME + whereClause;
if (log.isDebugEnabled()) {
log.debug("Count SQL: {}, parameters: {}", sql, parameters);
}
if (!isNullOrEmpty(source.getEndpoint())) {
conditions.add(Event.ENDPOINT + "=?");
parameters.add(source.getEndpoint());
try (final ResultSet resultSet = client.executeQuery(connection, sql, parameters)) {
if (!resultSet.next()) {
return result;
}
result.setTotal(resultSet.getInt("total"));
}
}
if (!isNullOrEmpty(condition.getName())) {
conditions.add(Event.NAME + "=?");
parameters.add(condition.getName());
}
if (condition.getType() != null) {
conditions.add(Event.TYPE + "=?");
parameters.add(condition.getType().name());
}
final Duration time = condition.getTime();
if (time != null) {
if (time.getStartTimestamp() > 0) {
conditions.add(Event.START_TIME + ">?");
parameters.add(time.getStartTimestamp());
sql = "select * from " + Event.INDEX_NAME + whereClause + " limit " + condition.getSize();
if (log.isDebugEnabled()) {
log.debug("Query SQL: {}, parameters: {}", sql, parameters);
}
if (time.getEndTimestamp() > 0) {
conditions.add(Event.END_TIME + "<?");
parameters.add(time.getEndTimestamp());
try (final ResultSet resultSet = client.executeQuery(connection, sql, parameters)) {
while (resultSet.next()) {
result.getEvents().add(parseResultSet(resultSet));
}
}
}
final String whereClause = conditions.isEmpty() ? "" : conditions.stream().collect(Collectors.joining(" and ", " where ", ""));
return result;
}
@Override
public Events queryEvents(List<EventQueryCondition> conditions) throws Exception {
final List<Tuple2<Stream<String>, Stream<Object>>> conditionsParametersPair = conditions.stream()
.map(this::buildQuery)
.collect(Collectors.toList());
final Object[] parameters = conditionsParametersPair.stream()
.map(Tuple2::_2)
.reduce(Stream.empty(), Stream::concat)
.toArray();
final String whereClause = conditionsParametersPair.stream()
.map(Tuple2::_1)
.map(it -> it.collect(Collectors.joining(" and ")))
.collect(Collectors.joining(" or ", " where ", ""));
final int size = conditions.stream().mapToInt(EventQueryCondition::getSize).sum();
final Events result = new Events();
try (final Connection connection = client.getConnection()) {
String sql = "select count(1) total from " + Event.INDEX_NAME + whereClause;
if (log.isDebugEnabled()) {
log.debug("Count SQL: {}, parameters: {}", sql, parameters);
}
try (final ResultSet resultSet = client.executeQuery(connection, sql, parameters.toArray())) {
try (final ResultSet resultSet = client.executeQuery(connection, sql, parameters)) {
if (!resultSet.next()) {
return result;
}
result.setTotal(resultSet.getInt("total"));
}
sql = "select * from " + Event.INDEX_NAME + whereClause + " limit " + condition.getSize();
sql = "select * from " + Event.INDEX_NAME + whereClause + " limit " + size;
if (log.isDebugEnabled()) {
log.debug("Query SQL: {}, parameters: {}", sql, parameters);
}
try (final ResultSet resultSet = client.executeQuery(connection, sql, parameters.toArray())) {
try (final ResultSet resultSet = client.executeQuery(connection, sql, parameters)) {
while (resultSet.next()) {
result.getEvents().add(parseResultSet(resultSet));
}
}
}
return result;
}
......@@ -139,4 +138,54 @@ public class H2EventQueryDAO implements IEventQueryDAO {
return event;
}
protected Tuple2<Stream<String>, Stream<Object>> buildQuery(final EventQueryCondition condition) {
final Stream.Builder<String> conditions = Stream.builder();
final Stream.Builder<Object> parameters = Stream.builder();
if (!isNullOrEmpty(condition.getUuid())) {
conditions.add(Event.UUID + "=?");
parameters.add(condition.getUuid());
}
final Source source = condition.getSource();
if (source != null) {
if (!isNullOrEmpty(source.getService())) {
conditions.add(Event.SERVICE + "=?");
parameters.add(source.getService());
}
if (!isNullOrEmpty(source.getServiceInstance())) {
conditions.add(Event.SERVICE_INSTANCE + "=?");
parameters.add(source.getServiceInstance());
}
if (!isNullOrEmpty(source.getEndpoint())) {
conditions.add(Event.ENDPOINT + "=?");
parameters.add(source.getEndpoint());
}
}
if (!isNullOrEmpty(condition.getName())) {
conditions.add(Event.NAME + "=?");
parameters.add(condition.getName());
}
if (condition.getType() != null) {
conditions.add(Event.TYPE + "=?");
parameters.add(condition.getType().name());
}
final Duration time = condition.getTime();
if (time != null) {
if (time.getStartTimestamp() > 0) {
conditions.add(Event.START_TIME + ">?");
parameters.add(time.getStartTimestamp());
}
if (time.getEndTimestamp() > 0) {
conditions.add(Event.END_TIME + "<?");
parameters.add(time.getEndTimestamp());
}
}
return Tuple.of(conditions.build(), parameters.build());
}
}
......@@ -18,6 +18,8 @@
package org.apache.skywalking.e2e;
import org.springframework.util.StringUtils;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
......@@ -32,6 +34,7 @@ public abstract class AbstractQuery<T extends AbstractQuery<T>> {
private String start;
private String end;
private String step = "SECOND";
private String name;
public String start() {
if (start != null) {
......@@ -135,4 +138,16 @@ public abstract class AbstractQuery<T extends AbstractQuery<T>> {
this.step = "SECOND";
return (T) this;
}
public String name() {
if (!StringUtils.isEmpty(name)) {
return name;
}
return null;
}
public T name(String name) {
this.name = name;
return (T) this;
}
}
......@@ -20,6 +20,7 @@ package org.apache.skywalking.e2e.alarm;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.skywalking.e2e.common.KeyValue;
import org.apache.skywalking.e2e.event.Event;
import java.util.List;
......@@ -31,4 +32,5 @@ public class Alarm {
private String id;
private String message;
private List<KeyValue> tags;
private List<Event> events;
}
......@@ -18,15 +18,20 @@
package org.apache.skywalking.e2e.alarm;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.e2e.common.KeyValue;
import org.apache.skywalking.e2e.common.KeyValueMatcher;
import org.apache.skywalking.e2e.event.Event;
import org.apache.skywalking.e2e.event.EventMatcher;
import org.apache.skywalking.e2e.verification.AbstractMatcher;
import org.springframework.util.CollectionUtils;
import java.util.List;
import static java.util.Objects.nonNull;
import static org.assertj.core.api.Assertions.fail;
@Slf4j
@Data
public class AlarmMatcher extends AbstractMatcher<Alarm> {
private String startTime;
......@@ -34,10 +39,10 @@ public class AlarmMatcher extends AbstractMatcher<Alarm> {
private String id;
private String message;
private List<KeyValueMatcher> tags;
private List<EventMatcher> events;
@Override
public void verify(Alarm alarm) {
doVerify(this.startTime, alarm.getStartTime());
doVerify(this.scope, alarm.getScope());
doVerify(this.id, alarm.getId());
doVerify(this.message, alarm.getMessage());
......@@ -57,5 +62,22 @@ public class AlarmMatcher extends AbstractMatcher<Alarm> {
}
}
}
if (!CollectionUtils.isEmpty(getEvents())) {
for (final EventMatcher matcher : getEvents()) {
boolean matched = false;
for (final Event event : alarm.getEvents()) {
try {
matcher.verify(event);
matched = true;
} catch (Throwable ignore) {
//ignore.
}
}
if (!matched) {
fail("\nExpected: %s\n Actual: %s", getEvents(), alarm.getEvents());
}
}
}
}
}
......@@ -18,6 +18,7 @@
package org.apache.skywalking.e2e.alarm;
import org.apache.skywalking.e2e.AbstractQuery;
import org.apache.skywalking.e2e.event.Event;
import java.util.ArrayList;
import java.util.Collections;
......@@ -28,15 +29,26 @@ import java.util.HashMap;
public class AlarmQuery extends AbstractQuery<AlarmQuery> {
private List<Map<String, String>> tags = Collections.emptyList();
private List<Event> events = Collections.emptyList();
public List<Map<String, String>> tags() {
return tags;
}
public List<Event> events() {
return events;
}
public AlarmQuery tags(List<Map<String, String>> tags) {
this.tags = tags;
return this;
}
public AlarmQuery events(List<Event> events) {
this.events = events;
return this;
}
public AlarmQuery addTag(String key, String value) {
if (Collections.EMPTY_LIST.equals(tags)) {
tags = new ArrayList<>();
......@@ -47,4 +59,13 @@ public class AlarmQuery extends AbstractQuery<AlarmQuery> {
tags.add(tag);
return this;
}
public AlarmQuery addEvents(List<Event> events) {
if (Collections.EMPTY_LIST.equals(events)) {
events = new ArrayList<>();
}
events.addAll(events);
return this;
}
}
......@@ -29,6 +29,19 @@
tags {
key, value
}
events {
name
source {
service serviceInstance endpoint
}
startTime
endTime
message
parameters {
key value
}
uuid
}
}
}}",
"variables":{
......
......@@ -17,6 +17,8 @@
package org.apache.skywalking.e2e.alarm;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.e2e.annotation.ContainerHostAndPort;
import org.apache.skywalking.e2e.annotation.DockerCompose;
......@@ -35,9 +37,6 @@ import org.junit.jupiter.api.TestMethodOrder;
import org.springframework.http.ResponseEntity;
import org.testcontainers.containers.DockerComposeContainer;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.skywalking.e2e.utils.Times.now;
import static org.apache.skywalking.e2e.utils.Yamls.load;
......
......@@ -24,6 +24,9 @@ matchers:
value: CRITICAL
- key: receivers
value: zhangsan
events:
- source:
service: e2e-service-provider
- startTime: gt 0
scope: Service
id: ZTJlLXNlcnZpY2UtcHJvdmlkZXI=.1
......@@ -33,4 +36,6 @@ matchers:
value: CRITICAL
- key: receivers
value: zhangsan
events:
- source:
service: e2e-service-provider
......@@ -24,6 +24,9 @@ matchers:
value: WARNING
- key: receivers
value: lisi
events:
- source:
service: e2e-service-provider
- startTime: gt 0
scope: Service
id: ZTJlLXNlcnZpY2UtcHJvdmlkZXI=.1
......@@ -33,3 +36,6 @@ matchers:
value: WARNING
- key: receivers
value: lisi
events:
- source:
service: e2e-service-provider
......@@ -24,3 +24,6 @@ matchers:
value: CRITICAL
- key: receivers
value: zhangsan
events:
- source:
service: e2e-service-provider
\ No newline at end of file
......@@ -24,3 +24,6 @@ matchers:
value: WARNING
- key: receivers
value: lisi
events:
- source:
service: e2e-service-provider
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册