未验证 提交 25ebd6c2 编写于 作者: J Jiajing LU 提交者: GitHub

Add Banyandb storage e2e (#9112)

上级 8b40d226
...@@ -285,6 +285,8 @@ jobs: ...@@ -285,6 +285,8 @@ jobs:
- name: Agent Lua - name: Agent Lua
config: test/e2e-v2/cases/lua/e2e.yaml config: test/e2e-v2/cases/lua/e2e.yaml
- name: BanyanDB
config: test/e2e-v2/cases/storage/banyandb/e2e.yaml
- name: Storage H2 - name: Storage H2
config: test/e2e-v2/cases/storage/h2/e2e.yaml config: test/e2e-v2/cases/storage/h2/e2e.yaml
- name: Storage MySQL - name: Storage MySQL
...@@ -398,6 +400,8 @@ jobs: ...@@ -398,6 +400,8 @@ jobs:
config: test/e2e-v2/cases/log/fluent-bit/e2e.yaml config: test/e2e-v2/cases/log/fluent-bit/e2e.yaml
env: ES_VERSION=7.15.0 env: ES_VERSION=7.15.0
- name: Trace Profiling BanyanDB
config: test/e2e-v2/cases/profiling/trace/banyandb/e2e.yaml
- name: Trace Profiling H2 - name: Trace Profiling H2
config: test/e2e-v2/cases/profiling/trace/h2/e2e.yaml config: test/e2e-v2/cases/profiling/trace/h2/e2e.yaml
- name: Trace Profiling ES - name: Trace Profiling ES
...@@ -405,6 +409,8 @@ jobs: ...@@ -405,6 +409,8 @@ jobs:
- name: Trace Profiling MySQL - name: Trace Profiling MySQL
config: test/e2e-v2/cases/profiling/trace/mysql/e2e.yaml config: test/e2e-v2/cases/profiling/trace/mysql/e2e.yaml
- name: eBPF Profiling BanyanDB
config: test/e2e-v2/cases/profiling/ebpf/banyandb/e2e.yaml
- name: eBPF Profiling H2 - name: eBPF Profiling H2
config: test/e2e-v2/cases/profiling/ebpf/h2/e2e.yaml config: test/e2e-v2/cases/profiling/ebpf/h2/e2e.yaml
- name: eBPF Profiling ES - name: eBPF Profiling ES
......
...@@ -121,13 +121,13 @@ public class ProfileTaskCache implements Service { ...@@ -121,13 +121,13 @@ public class ProfileTaskCache implements Service {
* use for every db query * use for every db query
*/ */
public long getCacheStartTimeBucket() { public long getCacheStartTimeBucket() {
return TimeBucket.getRecordTimeBucket(System.currentTimeMillis()); return TimeBucket.getMinuteTimeBucket(System.currentTimeMillis());
} }
/** /**
* use for every db query, +10 start time and +15 end time(because use task end time to search) * use for every db query, +10 start time and +15 end time(because use task end time to search)
*/ */
public long getCacheEndTimeBucket() { public long getCacheEndTimeBucket() {
return TimeBucket.getRecordTimeBucket(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(25)); return TimeBucket.getMinuteTimeBucket(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(25));
} }
} }
...@@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit; ...@@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.network.constants.ProfileConstants; import org.apache.skywalking.oap.server.network.constants.ProfileConstants;
import org.apache.skywalking.oap.server.library.util.StringUtil; import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.worker.NoneStreamProcessor; import org.apache.skywalking.oap.server.core.analysis.worker.NoneStreamProcessor;
import org.apache.skywalking.oap.server.core.query.type.ProfileTaskCreationResult; import org.apache.skywalking.oap.server.core.query.type.ProfileTaskCreationResult;
...@@ -93,7 +92,7 @@ public class ProfileTaskMutationService implements Service { ...@@ -93,7 +92,7 @@ public class ProfileTaskMutationService implements Service {
task.setDumpPeriod(dumpPeriod); task.setDumpPeriod(dumpPeriod);
task.setCreateTime(createTime); task.setCreateTime(createTime);
task.setMaxSamplingCount(maxSamplingCount); task.setMaxSamplingCount(maxSamplingCount);
task.setTimeBucket(TimeBucket.getRecordTimeBucket(taskEndTime)); task.setTimeBucket(TimeBucket.getMinuteTimeBucket(taskStartTime));
NoneStreamProcessor.getInstance().in(task); NoneStreamProcessor.getInstance().in(task);
return ProfileTaskCreationResult.builder().id(task.id()).build(); return ProfileTaskCreationResult.builder().id(task.id()).build();
...@@ -138,8 +137,8 @@ public class ProfileTaskMutationService implements Service { ...@@ -138,8 +137,8 @@ public class ProfileTaskMutationService implements Service {
} }
// Each service can monitor up to 1 endpoints during the execution of tasks // Each service can monitor up to 1 endpoints during the execution of tasks
long startTimeBucket = TimeBucket.getTimeBucket(monitorStartTime, DownSampling.Second); long startTimeBucket = TimeBucket.getMinuteTimeBucket(monitorStartTime);
long endTimeBucket = TimeBucket.getTimeBucket(monitorEndTime, DownSampling.Second); long endTimeBucket = TimeBucket.getMinuteTimeBucket(monitorEndTime);
final List<ProfileTask> alreadyHaveTaskList = getProfileTaskDAO().getTaskList( final List<ProfileTask> alreadyHaveTaskList = getProfileTaskDAO().getTaskList(
serviceId, null, startTimeBucket, endTimeBucket, 1); serviceId, null, startTimeBucket, endTimeBucket, 1);
if (CollectionUtils.isNotEmpty(alreadyHaveTaskList)) { if (CollectionUtils.isNotEmpty(alreadyHaveTaskList)) {
......
...@@ -240,7 +240,7 @@ storage: ...@@ -240,7 +240,7 @@ storage:
maxBulkSize: ${SW_STORAGE_BANYANDB_MAX_BULK_SIZE:5000} maxBulkSize: ${SW_STORAGE_BANYANDB_MAX_BULK_SIZE:5000}
flushInterval: ${SW_STORAGE_BANYANDB_FLUSH_INTERVAL:15} flushInterval: ${SW_STORAGE_BANYANDB_FLUSH_INTERVAL:15}
concurrentWriteThreads: ${SW_STORAGE_BANYANDB_CONCURRENT_WRITE_THREADS:15} concurrentWriteThreads: ${SW_STORAGE_BANYANDB_CONCURRENT_WRITE_THREADS:15}
fetchTaskLogMaxSize: ${SW_STORAGE_BANYANDB_FETCH_TASK_LOG_MAX_SIZE:1000} # the max number of fetch task log in a request profileTaskQueryMaxSize: ${SW_STORAGE_BANYANDB_PROFILE_TASK_QUERY_MAX_SIZE:200} # the max number of fetch task in a request
agent-analyzer: agent-analyzer:
selector: ${SW_AGENT_ANALYZER:default} selector: ${SW_AGENT_ANALYZER:default}
......
...@@ -44,8 +44,8 @@ public class BanyanDBStorageConfig extends ModuleConfig { ...@@ -44,8 +44,8 @@ public class BanyanDBStorageConfig extends ModuleConfig {
*/ */
private int concurrentWriteThreads = 2; private int concurrentWriteThreads = 2;
/** /**
* Max size of {@link org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog} to be fetched * Max size of {@link org.apache.skywalking.oap.server.core.query.type.ProfileTask} to be fetched
* in a single request. * in a single request.
*/ */
private int fetchTaskLogMaxSize; private int profileTaskQueryMaxSize = 200;
} }
...@@ -113,9 +113,9 @@ public class BanyanDBStorageProvider extends ModuleProvider { ...@@ -113,9 +113,9 @@ public class BanyanDBStorageProvider extends ModuleProvider {
this.registerServiceImplementation(IMetadataQueryDAO.class, new BanyanDBMetadataQueryDAO(client)); this.registerServiceImplementation(IMetadataQueryDAO.class, new BanyanDBMetadataQueryDAO(client));
this.registerServiceImplementation(IAlarmQueryDAO.class, new BanyanDBAlarmQueryDAO(client)); this.registerServiceImplementation(IAlarmQueryDAO.class, new BanyanDBAlarmQueryDAO(client));
this.registerServiceImplementation(ILogQueryDAO.class, new BanyanDBLogQueryDAO(client)); this.registerServiceImplementation(ILogQueryDAO.class, new BanyanDBLogQueryDAO(client));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new BanyanDBProfileTaskQueryDAO(client)); this.registerServiceImplementation(IProfileTaskQueryDAO.class, new BanyanDBProfileTaskQueryDAO(client, this.config.getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new BanyanDBProfileTaskLogQueryDAO(client, this.config.getFetchTaskLogMaxSize())); this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new BanyanDBProfileTaskLogQueryDAO(client, this.config.getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class, new BanyanDBProfileThreadSnapshotQueryDAO(client)); this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class, new BanyanDBProfileThreadSnapshotQueryDAO(client, this.config.getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(UITemplateManagementDAO.class, new BanyanDBUITemplateManagementDAO(client)); this.registerServiceImplementation(UITemplateManagementDAO.class, new BanyanDBUITemplateManagementDAO(client));
this.registerServiceImplementation(IEventQueryDAO.class, new BanyanDBEventQueryDAO(client)); this.registerServiceImplementation(IEventQueryDAO.class, new BanyanDBEventQueryDAO(client));
this.registerServiceImplementation(ITopologyQueryDAO.class, new BanyanDBTopologyQueryDAO(client)); this.registerServiceImplementation(ITopologyQueryDAO.class, new BanyanDBTopologyQueryDAO(client));
......
...@@ -28,6 +28,7 @@ import org.apache.skywalking.oap.server.core.analysis.manual.relation.endpoint.E ...@@ -28,6 +28,7 @@ import org.apache.skywalking.oap.server.core.analysis.manual.relation.endpoint.E
import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics; import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics; import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics; import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.query.type.Call; import org.apache.skywalking.oap.server.core.query.type.Call;
import org.apache.skywalking.oap.server.core.source.DetectPoint; import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO; import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
...@@ -106,19 +107,21 @@ public class BanyanDBTopologyQueryDAO extends AbstractBanyanDBDAO implements ITo ...@@ -106,19 +107,21 @@ public class BanyanDBTopologyQueryDAO extends AbstractBanyanDBDAO implements ITo
if (startTB > 0 && endTB > 0) { if (startTB > 0 && endTB > 0) {
timestampRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB)); timestampRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
} }
final String modelName = detectPoint == DetectPoint.SERVER ? ServiceRelationServerSideMetrics.INDEX_NAME :
ServiceRelationClientSideMetrics.INDEX_NAME;
final Map<String, Call.CallDetail> callMap = new HashMap<>(); final Map<String, Call.CallDetail> callMap = new HashMap<>();
for (final QueryBuilder<MeasureQuery> q : queryBuilderList) { for (final QueryBuilder<MeasureQuery> q : queryBuilderList) {
MeasureQueryResponse resp = query(ServiceRelationClientSideMetrics.INDEX_NAME, MeasureQueryResponse resp = query(modelName,
ImmutableSet.of(ServiceRelationClientSideMetrics.COMPONENT_ID, ImmutableSet.of(ServiceRelationClientSideMetrics.COMPONENT_ID,
ServiceRelationClientSideMetrics.SOURCE_SERVICE_ID, ServiceRelationClientSideMetrics.SOURCE_SERVICE_ID,
ServiceRelationClientSideMetrics.DEST_SERVICE_ID, ServiceRelationClientSideMetrics.DEST_SERVICE_ID,
ServiceRelationClientSideMetrics.ENTITY_ID), Metrics.ENTITY_ID),
Collections.emptySet(), timestampRange, q); Collections.emptySet(), timestampRange, q);
if (resp.size() == 0) { if (resp.size() == 0) {
continue; continue;
} }
final Call.CallDetail call = new Call.CallDetail(); final Call.CallDetail call = new Call.CallDetail();
final String entityId = resp.getDataPoints().get(0).getTagValue(ServiceRelationClientSideMetrics.ENTITY_ID); final String entityId = resp.getDataPoints().get(0).getTagValue(Metrics.ENTITY_ID);
final int componentId = ((Number) resp.getDataPoints().get(0).getTagValue(ServiceRelationClientSideMetrics.COMPONENT_ID)).intValue(); final int componentId = ((Number) resp.getDataPoints().get(0).getTagValue(ServiceRelationClientSideMetrics.COMPONENT_ID)).intValue();
call.buildFromServiceRelation(entityId, componentId, detectPoint); call.buildFromServiceRelation(entityId, componentId, detectPoint);
callMap.putIfAbsent(entityId, call); callMap.putIfAbsent(entityId, call);
...@@ -163,19 +166,21 @@ public class BanyanDBTopologyQueryDAO extends AbstractBanyanDBDAO implements ITo ...@@ -163,19 +166,21 @@ public class BanyanDBTopologyQueryDAO extends AbstractBanyanDBDAO implements ITo
if (startTB > 0 && endTB > 0) { if (startTB > 0 && endTB > 0) {
timestampRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB)); timestampRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
} }
final String modelName = detectPoint == DetectPoint.SERVER ? ServiceInstanceRelationServerSideMetrics.INDEX_NAME :
ServiceRelationClientSideMetrics.INDEX_NAME;
final Map<String, Call.CallDetail> callMap = new HashMap<>(); final Map<String, Call.CallDetail> callMap = new HashMap<>();
for (final QueryBuilder<MeasureQuery> q : queryBuilderList) { for (final QueryBuilder<MeasureQuery> q : queryBuilderList) {
MeasureQueryResponse resp = query(ServiceInstanceRelationServerSideMetrics.INDEX_NAME, MeasureQueryResponse resp = query(modelName,
ImmutableSet.of(ServiceInstanceRelationServerSideMetrics.COMPONENT_ID, ImmutableSet.of(ServiceInstanceRelationServerSideMetrics.COMPONENT_ID,
ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID, ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID, ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID,
ServiceInstanceRelationServerSideMetrics.ENTITY_ID), Metrics.ENTITY_ID),
Collections.emptySet(), timestampRange, q); Collections.emptySet(), timestampRange, q);
if (resp.size() == 0) { if (resp.size() == 0) {
continue; continue;
} }
final Call.CallDetail call = new Call.CallDetail(); final Call.CallDetail call = new Call.CallDetail();
final String entityId = resp.getDataPoints().get(0).getTagValue(ServiceInstanceRelationServerSideMetrics.ENTITY_ID); final String entityId = resp.getDataPoints().get(0).getTagValue(Metrics.ENTITY_ID);
final int componentId = ((Number) resp.getDataPoints().get(0).getTagValue(ServiceRelationClientSideMetrics.COMPONENT_ID)).intValue(); final int componentId = ((Number) resp.getDataPoints().get(0).getTagValue(ServiceRelationClientSideMetrics.COMPONENT_ID)).intValue();
call.buildFromInstanceRelation(entityId, componentId, detectPoint); call.buildFromInstanceRelation(entityId, componentId, detectPoint);
callMap.putIfAbsent(entityId, call); callMap.putIfAbsent(entityId, call);
...@@ -217,13 +222,13 @@ public class BanyanDBTopologyQueryDAO extends AbstractBanyanDBDAO implements ITo ...@@ -217,13 +222,13 @@ public class BanyanDBTopologyQueryDAO extends AbstractBanyanDBDAO implements ITo
MeasureQueryResponse resp = query(EndpointRelationServerSideMetrics.INDEX_NAME, MeasureQueryResponse resp = query(EndpointRelationServerSideMetrics.INDEX_NAME,
ImmutableSet.of(EndpointRelationServerSideMetrics.DEST_ENDPOINT, ImmutableSet.of(EndpointRelationServerSideMetrics.DEST_ENDPOINT,
EndpointRelationServerSideMetrics.SOURCE_ENDPOINT, EndpointRelationServerSideMetrics.SOURCE_ENDPOINT,
EndpointRelationServerSideMetrics.ENTITY_ID), Metrics.ENTITY_ID),
Collections.emptySet(), timestampRange, q); Collections.emptySet(), timestampRange, q);
if (resp.size() == 0) { if (resp.size() == 0) {
continue; continue;
} }
final Call.CallDetail call = new Call.CallDetail(); final Call.CallDetail call = new Call.CallDetail();
final String entityId = resp.getDataPoints().get(0).getTagValue(EndpointRelationServerSideMetrics.ENTITY_ID); final String entityId = resp.getDataPoints().get(0).getTagValue(Metrics.ENTITY_ID);
call.buildFromEndpointRelation(entityId, detectPoint); call.buildFromEndpointRelation(entityId, detectPoint);
callMap.putIfAbsent(entityId, call); callMap.putIfAbsent(entityId, call);
} }
......
...@@ -29,7 +29,6 @@ import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTas ...@@ -29,7 +29,6 @@ import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTas
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
...@@ -43,9 +42,10 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen ...@@ -43,9 +42,10 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen
private final int queryMaxSize; private final int queryMaxSize;
public BanyanDBProfileTaskLogQueryDAO(BanyanDBStorageClient client, int queryMaxSize) { public BanyanDBProfileTaskLogQueryDAO(BanyanDBStorageClient client, int profileTaskQueryMaxSize) {
super(client); super(client);
this.queryMaxSize = queryMaxSize; // query log size use profile task query max size * per log count
this.queryMaxSize = profileTaskQueryMaxSize * 50;
} }
@Override @Override
...@@ -58,10 +58,6 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen ...@@ -58,10 +58,6 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen
} }
}); });
if (resp.size() == 0) {
return Collections.emptyList();
}
final LinkedList<ProfileTaskLog> tasks = new LinkedList<>(); final LinkedList<ProfileTaskLog> tasks = new LinkedList<>();
for (final RowEntity rowEntity : resp.getElements()) { for (final RowEntity rowEntity : resp.getElements()) {
tasks.add(buildProfileTaskLog(rowEntity)); tasks.add(buildProfileTaskLog(rowEntity));
......
...@@ -19,9 +19,11 @@ ...@@ -19,9 +19,11 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
import org.apache.skywalking.banyandb.v1.client.RowEntity; import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery; import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.profiling.trace.ProfileTaskRecord; import org.apache.skywalking.oap.server.core.profiling.trace.ProfileTaskRecord;
import org.apache.skywalking.oap.server.core.query.type.ProfileTask; import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO; import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO;
...@@ -43,11 +45,15 @@ public class BanyanDBProfileTaskQueryDAO extends AbstractBanyanDBDAO implements ...@@ -43,11 +45,15 @@ public class BanyanDBProfileTaskQueryDAO extends AbstractBanyanDBDAO implements
ProfileTaskRecord.DURATION, ProfileTaskRecord.DURATION,
ProfileTaskRecord.MIN_DURATION_THRESHOLD, ProfileTaskRecord.MIN_DURATION_THRESHOLD,
ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.DUMP_PERIOD,
ProfileTaskRecord.MAX_SAMPLING_COUNT ProfileTaskRecord.MAX_SAMPLING_COUNT,
Metrics.TIME_BUCKET
); );
public BanyanDBProfileTaskQueryDAO(BanyanDBStorageClient client) { private final int queryMaxSize;
public BanyanDBProfileTaskQueryDAO(BanyanDBStorageClient client, int queryMaxSize) {
super(client); super(client);
this.queryMaxSize = queryMaxSize;
} }
@Override @Override
...@@ -63,14 +69,17 @@ public class BanyanDBProfileTaskQueryDAO extends AbstractBanyanDBDAO implements ...@@ -63,14 +69,17 @@ public class BanyanDBProfileTaskQueryDAO extends AbstractBanyanDBDAO implements
query.and(eq(ProfileTaskRecord.ENDPOINT_NAME, endpointName)); query.and(eq(ProfileTaskRecord.ENDPOINT_NAME, endpointName));
} }
if (startTimeBucket != null) { if (startTimeBucket != null) {
query.and(gte(ProfileTaskRecord.TIME_BUCKET, startTimeBucket)); query.and(gte(Metrics.TIME_BUCKET, startTimeBucket));
} }
if (endTimeBucket != null) { if (endTimeBucket != null) {
query.and(lte(ProfileTaskRecord.TIME_BUCKET, endTimeBucket)); query.and(lte(Metrics.TIME_BUCKET, endTimeBucket));
} }
if (limit != null) { if (limit != null) {
query.setLimit(limit); query.setLimit(limit);
} else {
query.setLimit(BanyanDBProfileTaskQueryDAO.this.queryMaxSize);
} }
query.setOrderBy(new AbstractQuery.OrderBy(ProfileTaskRecord.START_TIME, AbstractQuery.Sort.DESC));
} }
}); });
......
...@@ -73,11 +73,14 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i ...@@ -73,11 +73,14 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
SegmentRecord.TIME_BUCKET, SegmentRecord.TIME_BUCKET,
SegmentRecord.DATA_BINARY); SegmentRecord.DATA_BINARY);
private final int querySegmentMaxSize;
protected final ProfileThreadSnapshotRecord.Builder builder = protected final ProfileThreadSnapshotRecord.Builder builder =
new ProfileThreadSnapshotRecord.Builder(); new ProfileThreadSnapshotRecord.Builder();
public BanyanDBProfileThreadSnapshotQueryDAO(BanyanDBStorageClient client) { public BanyanDBProfileThreadSnapshotQueryDAO(BanyanDBStorageClient client, int profileTaskQueryMaxSize) {
super(client); super(client);
this.querySegmentMaxSize = profileTaskQueryMaxSize;
} }
@Override @Override
...@@ -89,6 +92,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i ...@@ -89,6 +92,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
public void apply(StreamQuery query) { public void apply(StreamQuery query) {
query.and(eq(ProfileThreadSnapshotRecord.TASK_ID, taskId)) query.and(eq(ProfileThreadSnapshotRecord.TASK_ID, taskId))
.and(eq(ProfileThreadSnapshotRecord.SEQUENCE, 0L)); .and(eq(ProfileThreadSnapshotRecord.SEQUENCE, 0L));
query.setLimit(querySegmentMaxSize);
} }
}); });
......
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: '2.1'
services:
banyandb:
image: "ghcr.io/apache/skywalking-banyandb:${SW_BANYANDB_COMMIT}"
networks:
- e2e
expose:
- 17912
command: standalone --stream-root-path /tmp/stream-data --measure-root-path /tmp/measure-data
healthcheck:
test: [ "CMD", "sh", "-c", "nc -nz 127.0.0.1 17912" ]
interval: 5s
timeout: 60s
retries: 120
oap:
extends:
file: ../docker-compose.yml
service: oap
networks:
- e2e
environment:
SW_STORAGE: banyandb
SW_STORAGE_BANYANDB_HOST: "banyandb"
depends_on:
banyandb:
condition: service_healthy
ports:
- 12800
sqrt:
extends:
file: ../docker-compose.yml
service: sqrt
networks:
- e2e
rover:
extends:
file: ../docker-compose.yml
service: rover
networks:
- e2e
depends_on:
oap:
condition: service_healthy
networks:
e2e:
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file is used to show how to write configuration files and can be used to test.
setup:
env: compose
file: docker-compose.yml
timeout: 20m
init-system-environment: ../../../../script/env
steps:
- name: set PATH
command: export PATH=/tmp/skywalking-infra-e2e/bin:$PATH
- name: install yq
command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
- name: install swctl
command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
verify:
retry:
count: 20
interval: 10s
cases:
- includes:
- ../profiling-cases.yaml
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: '3.8'
services:
banyandb:
image: "ghcr.io/apache/skywalking-banyandb:${SW_BANYANDB_COMMIT}"
networks:
- e2e
expose:
- 17912
command: standalone --stream-root-path /tmp/stream-data --measure-root-path /tmp/measure-data
healthcheck:
test: [ "CMD", "sh", "-c", "nc -nz 127.0.0.1 17912" ]
interval: 5s
timeout: 60s
retries: 120
provider:
extends:
file: ../../../../script/docker-compose/base-compose.yml
service: provider
depends_on:
oap:
condition: service_healthy
ports:
- 9090
oap:
extends:
file: ../../../../script/docker-compose/base-compose.yml
service: oap
environment:
SW_STORAGE: banyandb
SW_STORAGE_BANYANDB_HOST: "banyandb"
depends_on:
banyandb:
condition: service_healthy
ports:
- 12800
networks:
e2e:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file is used to show how to write configuration files and can be used to test.
setup:
env: compose
file: docker-compose.yml
timeout: 20m
init-system-environment: ../../../../script/env
steps:
- name: set PATH
command: export PATH=/tmp/skywalking-infra-e2e/bin:$PATH
- name: install yq
command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
- name: install swctl
command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
verify:
retry:
count: 20
interval: 3s
cases:
- includes:
- ../profiling-cases.yaml
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: '2.1'
services:
banyandb:
image: "ghcr.io/apache/skywalking-banyandb:${SW_BANYANDB_COMMIT}"
networks:
- e2e
expose:
- 17912
command: standalone --stream-root-path /tmp/stream-data --measure-root-path /tmp/measure-data
healthcheck:
test: ["CMD", "sh", "-c", "nc -nz 127.0.0.1 17912"]
interval: 5s
timeout: 60s
retries: 120
oap:
extends:
file: ../../../script/docker-compose/base-compose.yml
service: oap
environment:
SW_STORAGE: banyandb
SW_STORAGE_BANYANDB_HOST: "banyandb"
ports:
- 12800
depends_on:
banyandb:
condition: service_healthy
provider:
extends:
file: ../../../script/docker-compose/base-compose.yml
service: provider
ports:
- 9090
networks:
- e2e
depends_on:
oap:
condition: service_healthy
consumer:
extends:
file: ../../../script/docker-compose/base-compose.yml
service: consumer
ports:
- 9092
depends_on:
oap:
condition: service_healthy
provider:
condition: service_healthy
networks:
e2e:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file is used to show how to write configuration files and can be used to test.
setup:
env: compose
file: docker-compose.yml
timeout: 20m
init-system-environment: ../../../script/env
steps:
- name: set PATH
command: export PATH=/tmp/skywalking-infra-e2e/bin:$PATH
- name: install yq
command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
- name: install swctl
command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
trigger:
action: http
interval: 3s
times: 10
url: http://${consumer_host}:${consumer_9092}/info
method: POST
verify:
# verify with retry strategy
retry:
# max retry count
count: 20
# the interval between two retries, in millisecond.
interval: 10s
cases:
- includes:
- ../storage-cases.yaml
...@@ -23,5 +23,6 @@ SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449 ...@@ -23,5 +23,6 @@ SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016 SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
SW_KUBERNETES_COMMIT_SHA=0f3ec68e5a7e1608cec8688716b848ed15e971e5 SW_KUBERNETES_COMMIT_SHA=0f3ec68e5a7e1608cec8688716b848ed15e971e5
SW_ROVER_COMMIT=90c93c706743aac1f5853b677730edae8cc32a2c SW_ROVER_COMMIT=90c93c706743aac1f5853b677730edae8cc32a2c
SW_BANYANDB_COMMIT=f868a5dcd881c21dbf044013228994f49f15bd83
SW_CTL_COMMIT=219876daf985fd474955834ef0b65013f0890e96 SW_CTL_COMMIT=219876daf985fd474955834ef0b65013f0890e96
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册