提交 4cf0600f 编写于 作者: 静夜思朝颜's avatar 静夜思朝颜 提交者: wu-sheng

Provide profile task downstream to sniffer (#4172)

* Provide profile task downstream to sniffer

* fix agent unit testcase issue

* add profile switch config on sniffer

* fix es error

* 1. add @DefaultImplementor on the sniffer profile task service
2. change ProfileTaskExecutionService#PROFILE_TASK_READY_SCHEDULE to final and remove volatile
2. fix style error

* change timeFromStartMills use `<` to compare

* 1. add `maxSamplingCount` to profile task
2. make profile task limit to the common package

* 1. change `agent.active_profile` to `profile.active` and make true on default
2. add `maxSamplingCount` in profile task
3. use `createTime` to check has new command list
4. add task re-check before process profile task

* 1. add `profile-receiver` document
2. change `ProfileTaskExecutionService` use single schedule thread pool
3. cache dont need fetch data when no data, use auto-fresh mechanism only

* remove navigate time judge, fix comment wrong meaning
上级 c6b16954
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.network.constants;
/**
* profile task limit constants
*
* @author MrPro
*/
public class ProfileConstants {
/**
* Monitor duration must greater than 1 minutes
*/
public static final int TASK_DURATION_MIN_MINUTE = 1;
/**
* The duration of the monitoring task cannot be greater than 15 minutes
*/
public static final int TASK_DURATION_MAX_MINUTE = 15;
/**
* Dump period must be greater than or equals 10 milliseconds
*/
public static final int TASK_DUMP_PERIOD_MIN_MILLIS = 10;
/**
* Max sampling count must less than 10
*/
public static final int TASK_MAX_SAMPLING_COUNT = 10;
}
......@@ -28,6 +28,8 @@ public class CommandDeserializer {
final String commandName = command.getCommand();
if (ServiceResetCommand.NAME.equals(commandName)) {
return ServiceResetCommand.DESERIALIZER.deserialize(command);
} else if (ProfileTaskCommand.NAME.equals(commandName)) {
return ProfileTaskCommand.DESERIALIZER.deserialize(command);
}
throw new UnsupportedCommandException(command);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.network.trace.component.command;
import org.apache.skywalking.apm.network.common.Command;
import org.apache.skywalking.apm.network.common.KeyStringValuePair;
import java.util.List;
/**
* @author MrPro
*/
public class ProfileTaskCommand extends BaseCommand implements Serializable, Deserializable<ProfileTaskCommand> {
public static final Deserializable<ProfileTaskCommand> DESERIALIZER = new ProfileTaskCommand("", "", 0, 0, 0, 0, 0, 0);
public static final String NAME = "ProfileTaskQuery";
// profile task data
private String endpointName;
private int duration;
private int minDurationThreshold;
private int dumpPeriod;
private int maxSamplingCount;
private long startTime;
private long createTime;
public ProfileTaskCommand(String serialNumber, String endpointName, int duration, int minDurationThreshold, int dumpPeriod, int maxSamplingCount, long startTime, long createTime) {
super(NAME, serialNumber);
this.endpointName = endpointName;
this.duration = duration;
this.minDurationThreshold = minDurationThreshold;
this.dumpPeriod = dumpPeriod;
this.maxSamplingCount = maxSamplingCount;
this.startTime = startTime;
this.createTime = createTime;
}
@Override
public ProfileTaskCommand deserialize(Command command) {
final List<KeyStringValuePair> argsList = command.getArgsList();
String serialNumber = null;
String endpointName = null;
int duration = 0;
int minDurationThreshold = 0;
int dumpPeriod = 0;
int maxSamplingCount = 0;
long startTime = 0;
long createTime = 0;
for (final KeyStringValuePair pair : argsList) {
if ("SerialNumber".equals(pair.getKey())) {
serialNumber = pair.getValue();
} else if ("EndpointName".equals(pair.getKey())) {
endpointName = pair.getValue();
} else if ("Duration".equals(pair.getKey())) {
duration = Integer.parseInt(pair.getValue());
} else if ("MinDurationThreshold".equals(pair.getKey())) {
minDurationThreshold = Integer.parseInt(pair.getValue());
} else if ("DumpPeriod".equals(pair.getKey())) {
dumpPeriod = Integer.parseInt(pair.getValue());
} else if ("MaxSamplingCount".equals(pair.getKey())) {
maxSamplingCount = Integer.parseInt(pair.getValue());
} else if ("StartTime".equals(pair.getKey())) {
startTime = Long.parseLong(pair.getValue());
} else if ("CreateTime".equals(pair.getKey())) {
createTime = Long.parseLong(pair.getValue());
}
}
return new ProfileTaskCommand(serialNumber, endpointName, duration, minDurationThreshold, dumpPeriod, maxSamplingCount, startTime, createTime);
}
@Override
public Command.Builder serialize() {
final Command.Builder builder = commandBuilder();
builder.addArgs(KeyStringValuePair.newBuilder().setKey("EndpointName").setValue(endpointName))
.addArgs(KeyStringValuePair.newBuilder().setKey("Duration").setValue(String.valueOf(duration)))
.addArgs(KeyStringValuePair.newBuilder().setKey("MinDurationThreshold").setValue(String.valueOf(minDurationThreshold)))
.addArgs(KeyStringValuePair.newBuilder().setKey("DumpPeriod").setValue(String.valueOf(dumpPeriod)))
.addArgs(KeyStringValuePair.newBuilder().setKey("MaxSamplingCount").setValue(String.valueOf(maxSamplingCount)))
.addArgs(KeyStringValuePair.newBuilder().setKey("StartTime").setValue(String.valueOf(startTime)))
.addArgs(KeyStringValuePair.newBuilder().setKey("CreateTime").setValue(String.valueOf(createTime)));
return builder;
}
public String getEndpointName() {
return endpointName;
}
public int getDuration() {
return duration;
}
public int getMinDurationThreshold() {
return minDurationThreshold;
}
public int getDumpPeriod() {
return dumpPeriod;
}
public int getMaxSamplingCount() {
return maxSamplingCount;
}
public long getStartTime() {
return startTime;
}
public long getCreateTime() {
return createTime;
}
}
Subproject commit c7113782a74858bae14ade4abc13653f26bf304a
Subproject commit 8f897e825d3c27dd661b255fb59096932705894c
......@@ -20,8 +20,10 @@ package org.apache.skywalking.apm.agent.core.commands;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.commands.executor.NoopCommandExecutor;
import org.apache.skywalking.apm.agent.core.commands.executor.ProfileTaskCommandExecutor;
import org.apache.skywalking.apm.agent.core.commands.executor.ServiceResetCommandExecutor;
import org.apache.skywalking.apm.network.trace.component.command.BaseCommand;
import org.apache.skywalking.apm.network.trace.component.command.ProfileTaskCommand;
import org.apache.skywalking.apm.network.trace.component.command.ServiceResetCommand;
import java.util.HashMap;
......@@ -49,6 +51,9 @@ public class CommandExecutorService implements BootService, CommandExecutor {
// Register all the supported commands with their executors here
commandExecutorMap.put(ServiceResetCommand.NAME, new ServiceResetCommandExecutor());
// Profile task executor
commandExecutorMap.put(ProfileTaskCommand.NAME, new ProfileTaskCommandExecutor());
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.commands.executor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.commands.CommandExecutionException;
import org.apache.skywalking.apm.agent.core.commands.CommandExecutor;
import org.apache.skywalking.apm.agent.core.profile.ProfileTask;
import org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService;
import org.apache.skywalking.apm.network.trace.component.command.BaseCommand;
import org.apache.skywalking.apm.network.trace.component.command.ProfileTaskCommand;
/**
* Command executor that executes the {@link ProfileTaskCommand} command
*
* @author MrPro
*/
public class ProfileTaskCommandExecutor implements CommandExecutor {
@Override
public void execute(BaseCommand command) throws CommandExecutionException {
final ProfileTaskCommand profileTaskCommand = (ProfileTaskCommand) command;
// build profile task
final ProfileTask profileTask = new ProfileTask();
profileTask.setEndpointName(profileTaskCommand.getEndpointName());
profileTask.setDuration(profileTaskCommand.getDuration());
profileTask.setMinDurationThreshold(profileTaskCommand.getMinDurationThreshold());
profileTask.setThreadDumpPeriod(profileTaskCommand.getDumpPeriod());
profileTask.setMaxSamplingCount(profileTaskCommand.getMaxSamplingCount());
profileTask.setStartTime(profileTaskCommand.getStartTime());
profileTask.setCreateTime(profileTaskCommand.getCreateTime());
// send to executor
ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class).addProfileTask(profileTask);
}
}
......@@ -137,6 +137,17 @@ public class Config {
* How long grpc client will timeout in sending data to upstream.
*/
public static int GRPC_UPSTREAM_TIMEOUT = 30;
/**
* Get profile task list interval
*/
public static int GET_PROFILE_TASK_INTERVAL = 20;
}
public static class Profile {
/**
* If true, skywalking agent will enable profile when user create a new profile task. Otherwise disable profile.
*/
public static boolean ACTIVE = true;
}
public static class Jvm {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.profile;
import java.util.Objects;
/**
* Profile task bean, receive from OAP server
*
* @author MrPro
*/
public class ProfileTask {
// monitor endpoint name
private String endpointName;
// task duration (minute)
private int duration;
// trace start monitoring time (ms)
private int minDurationThreshold;
// thread dump period (ms)
private int threadDumpPeriod;
// max number of traces monitor on the sniffer
private int maxSamplingCount;
// task start time
private long startTime;
// task create time
private long createTime;
public String getEndpointName() {
return endpointName;
}
public void setEndpointName(String endpointName) {
this.endpointName = endpointName;
}
public int getDuration() {
return duration;
}
public void setDuration(int duration) {
this.duration = duration;
}
public int getMinDurationThreshold() {
return minDurationThreshold;
}
public void setMinDurationThreshold(int minDurationThreshold) {
this.minDurationThreshold = minDurationThreshold;
}
public int getThreadDumpPeriod() {
return threadDumpPeriod;
}
public void setThreadDumpPeriod(int threadDumpPeriod) {
this.threadDumpPeriod = threadDumpPeriod;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public int getMaxSamplingCount() {
return maxSamplingCount;
}
public void setMaxSamplingCount(int maxSamplingCount) {
this.maxSamplingCount = maxSamplingCount;
}
public long getCreateTime() {
return createTime;
}
public void setCreateTime(long createTime) {
this.createTime = createTime;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ProfileTask that = (ProfileTask) o;
return duration == that.duration &&
minDurationThreshold == that.minDurationThreshold &&
threadDumpPeriod == that.threadDumpPeriod &&
maxSamplingCount == that.maxSamplingCount &&
startTime == that.startTime &&
createTime == that.createTime &&
endpointName.equals(that.endpointName);
}
@Override
public int hashCode() {
return Objects.hash(endpointName, duration, minDurationThreshold, threadDumpPeriod, maxSamplingCount, startTime, createTime);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.profile;
import java.util.Objects;
/**
* profile task execution context, it will create on process this profile task
*
* @author MrPro
*/
public class ProfileTaskExecutionContext {
// task data
private final ProfileTask task;
// task real start time
private final long startTime;
public ProfileTaskExecutionContext(ProfileTask task, long startTime) {
this.task = task;
this.startTime = startTime;
}
public ProfileTask getTask() {
return task;
}
public long getStartTime() {
return startTime;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ProfileTaskExecutionContext that = (ProfileTaskExecutionContext) o;
return Objects.equals(task, that.task);
}
@Override
public int hashCode() {
return Objects.hash(task);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.profile;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.network.constants.ProfileConstants;
import org.apache.skywalking.apm.util.StringUtil;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
/**
* Profile task executor, use {@link #addProfileTask(ProfileTask)} to add a new profile task.
*
* @author MrPro
*/
@DefaultImplementor
public class ProfileTaskExecutionService implements BootService {
private static final ILog logger = LogManager.getLogger(ProfileTaskExecutionService.class);
// add a schedule while waiting for the task to start or finish
private final static ScheduledExecutorService PROFILE_TASK_SCHEDULE = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("PROFILE-TASK-SCHEDULE"));
// last command create time, use to next query task list
private volatile long lastCommandCreateTime = -1;
// current processing profile task context
private final AtomicReference<ProfileTaskExecutionContext> taskExecutionContext = new AtomicReference<>();
// profile task list, include running and waiting running tasks
private final List<ProfileTask> profileTaskList = Collections.synchronizedList(new LinkedList<>());
/**
* get profile task from OAP
* @param task
*/
public void addProfileTask(ProfileTask task) {
// update last command create time
if (task.getCreateTime() > lastCommandCreateTime) {
lastCommandCreateTime = task.getCreateTime();
}
// check profile task limit
final String dataError = checkProfileTaskSuccess(task);
if (dataError != null) {
logger.warn("check command error, cannot process this profile task. reason: {}", dataError);
return;
}
// add task to list
profileTaskList.add(task);
// schedule to start task
long timeToProcessMills = task.getStartTime() - System.currentTimeMillis();
PROFILE_TASK_SCHEDULE.schedule(new Runnable() {
@Override
public void run() {
processProfileTask(task);
}
}, timeToProcessMills, TimeUnit.MILLISECONDS);
}
/**
* active the selected profile task to execution task, and start a removal task for it.
* @param task
*/
private synchronized void processProfileTask(ProfileTask task) {
// make sure prev profile task already stopped
stopCurrentProfileTask(taskExecutionContext.get());
// make stop task schedule and task context
// TODO process task on next step
final ProfileTaskExecutionContext currentStartedTaskContext = new ProfileTaskExecutionContext(task, System.currentTimeMillis());
taskExecutionContext.set(currentStartedTaskContext);
PROFILE_TASK_SCHEDULE.schedule(new Runnable() {
@Override
public void run() {
stopCurrentProfileTask(currentStartedTaskContext);
}
}, task.getDuration(), TimeUnit.MINUTES);
}
/**
* stop profile task, remove context data
*/
private synchronized void stopCurrentProfileTask(ProfileTaskExecutionContext needToStop) {
// stop same context only
if (needToStop == null || !taskExecutionContext.compareAndSet(needToStop, null)) {
return;
}
// remove task
profileTaskList.remove(needToStop.getTask());
// TODO notify OAP current profile task execute finish
}
@Override
public void prepare() throws Throwable {
}
@Override
public void boot() throws Throwable {
}
@Override
public void onComplete() throws Throwable {
}
@Override
public void shutdown() throws Throwable {
PROFILE_TASK_SCHEDULE.shutdown();
}
public long getLastCommandCreateTime() {
return lastCommandCreateTime;
}
/**
* check profile task data success, make the re-check, prevent receiving wrong data from database or OAP
* @param task
* @return
*/
private String checkProfileTaskSuccess(ProfileTask task) {
// endpoint name
if (StringUtil.isEmpty(task.getEndpointName())) {
return "endpoint name cannot be empty";
}
// duration
if (task.getDuration() < ProfileConstants.TASK_DURATION_MIN_MINUTE) {
return "monitor duration must greater than " + ProfileConstants.TASK_DURATION_MIN_MINUTE + " minutes";
}
if (task.getDuration() > ProfileConstants.TASK_DURATION_MAX_MINUTE) {
return "The duration of the monitoring task cannot be greater than " + ProfileConstants.TASK_DURATION_MAX_MINUTE + " minutes";
}
// min duration threshold
if (task.getMinDurationThreshold() < 0) {
return "min duration threshold must greater than or equals zero";
}
// dump period
if (task.getThreadDumpPeriod() < ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS) {
return "dump period must be greater than or equals " + ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS + " milliseconds";
}
// max sampling count
if (task.getMaxSamplingCount() <= 0) {
return "max sampling count must greater than zero";
}
if (task.getMaxSamplingCount() >= ProfileConstants.TASK_MAX_SAMPLING_COUNT) {
return "max sampling count must less than " + ProfileConstants.TASK_MAX_SAMPLING_COUNT;
}
// check task queue, check only one task in a certain time
long taskProcessFinishTime = calcProfileTaskFinishTime(task);
for (ProfileTask profileTask : profileTaskList) {
// if the end time of the task to be added is during the execution of any data, means is a error data
if (taskProcessFinishTime >= profileTask.getStartTime() && taskProcessFinishTime <= calcProfileTaskFinishTime(profileTask)) {
return "there already have processing task in time range, could not add a new task again. processing task monitor endpoint name: " + profileTask.getEndpointName();
}
}
return null;
}
private long calcProfileTaskFinishTime(ProfileTask task) {
return task.getStartTime() + TimeUnit.MINUTES.toMillis(task.getDuration());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.profile;
import io.grpc.Channel;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.commands.CommandService;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
import org.apache.skywalking.apm.network.common.Commands;
import org.apache.skywalking.apm.network.language.profile.ProfileTaskCommandQuery;
import org.apache.skywalking.apm.network.language.profile.ProfileTaskGrpc;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
/**
* sniffer will check has new profile task list every {@link Config.Collector#GET_PROFILE_TASK_INTERVAL} second.
*
* @author MrPro
*/
@DefaultImplementor
public class ProfileTaskQueryService implements BootService, Runnable, GRPCChannelListener {
private static final ILog logger = LogManager.getLogger(ProfileTaskQueryService.class);
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
private volatile ProfileTaskGrpc.ProfileTaskBlockingStub profileTaskBlockingStub;
private volatile ScheduledFuture<?> getTaskListFuture;
@Override
public void run() {
if (RemoteDownstreamConfig.Agent.SERVICE_ID != DictionaryUtil.nullValue()
&& RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != DictionaryUtil.nullValue()
) {
if (status == GRPCChannelStatus.CONNECTED) {
try {
ProfileTaskCommandQuery.Builder builder = ProfileTaskCommandQuery.newBuilder();
// sniffer info
builder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID).setInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
// last command create time
builder.setLastCommandTime(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class).getLastCommandCreateTime());
Commands commands = profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).getProfileTaskCommands(builder.build());
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
} catch (Throwable t) {
logger.error(t, "query profile task from Collector fail.", t);
}
}
}
}
@Override
public void prepare() throws Throwable {
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
}
@Override
public void boot() throws Throwable {
if (Config.Profile.ACTIVE) {
getTaskListFuture = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("ProfileGetTaskService"))
.scheduleWithFixedDelay(this, 0, Config.Collector.GET_PROFILE_TASK_INTERVAL, TimeUnit.SECONDS);
}
}
@Override
public void onComplete() throws Throwable {
}
@Override
public void shutdown() throws Throwable {
if (getTaskListFuture != null) {
getTaskListFuture.cancel(true);
}
}
@Override
public void statusChanged(GRPCChannelStatus status) {
if (GRPCChannelStatus.CONNECTED.equals(status)) {
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
profileTaskBlockingStub = ProfileTaskGrpc.newBlockingStub(channel);
} else {
profileTaskBlockingStub = null;
}
this.status = status;
}
}
......@@ -26,3 +26,5 @@ org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService
org.apache.skywalking.apm.agent.core.commands.CommandService
org.apache.skywalking.apm.agent.core.commands.CommandExecutorService
org.apache.skywalking.apm.agent.core.context.OperationNameFormatService
org.apache.skywalking.apm.agent.core.profile.ProfileTaskQueryService
org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService
......@@ -27,6 +27,8 @@ import org.apache.skywalking.apm.agent.core.context.IgnoredTracerContext;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
import org.apache.skywalking.apm.agent.core.jvm.JVMService;
import org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService;
import org.apache.skywalking.apm.agent.core.profile.ProfileTaskQueryService;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient;
......@@ -55,13 +57,15 @@ public class ServiceManagerTest {
public void testServiceDependencies() throws Exception {
HashMap<Class, BootService> registryService = getFieldValue(ServiceManager.INSTANCE, "bootedServices");
assertThat(registryService.size(), is(10));
assertThat(registryService.size(), is(12));
assertTraceSegmentServiceClient(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class));
assertContextManager(ServiceManager.INSTANCE.findService(ContextManager.class));
assertGRPCChannelManager(ServiceManager.INSTANCE.findService(GRPCChannelManager.class));
assertSamplingService(ServiceManager.INSTANCE.findService(SamplingService.class));
assertJVMService(ServiceManager.INSTANCE.findService(JVMService.class));
assertProfileTaskQueryService(ServiceManager.INSTANCE.findService(ProfileTaskQueryService.class));
assertProfileTaskExecuteService(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class));
assertTracingContextListener();
assertIgnoreTracingContextListener();
......@@ -83,11 +87,19 @@ public class ServiceManagerTest {
assertNotNull(service);
}
private void assertProfileTaskQueryService(ProfileTaskQueryService service) {
assertNotNull(service);
}
private void assertProfileTaskExecuteService(ProfileTaskExecutionService service) {
assertNotNull(service);
}
private void assertGRPCChannelManager(GRPCChannelManager service) throws Exception {
assertNotNull(service);
List<GRPCChannelListener> listeners = getFieldValue(service, "listeners");
assertEquals(listeners.size(), 3);
assertEquals(listeners.size(), 4);
}
private void assertSamplingService(SamplingService service) {
......
......@@ -41,6 +41,9 @@ agent.service_name=${SW_AGENT_NAME:Your_ApplicationName}
# The operationName max length
# agent.operation_name_threshold=${SW_AGENT_OPERATION_NAME_THRESHOLD:500}
# If true, skywalking agent will enable profile when user create a new profile task. Otherwise disable profile.
# profile.active=${SW_AGENT_PROFILE_ACTIVE:true}
# Backend service addresses.
collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800}
......
......@@ -151,6 +151,8 @@ receiver-jvm:
default:
receiver-clr:
default:
receiver-profile:
default:
service-mesh:
default:
bufferPath: ${SW_SERVICE_MESH_BUFFER_PATH:../mesh-buffer/} # Path to trace buffer files, suggest to use absolute path
......
......@@ -355,6 +355,8 @@ receiver-clr:
default:
receiver-so11y:
default:
receiver-profile:
default:
service-mesh:
default:
bufferPath: \${SW_SERVICE_MESH_BUFFER_PATH:../mesh-buffer/} # Path to trace buffer files, suggest to use absolute path
......
......@@ -13,6 +13,7 @@ We have following receivers, and `default` implementors are provided in our Apac
1. **envoy-metric**. Envoy `metrics_service` and `ALS(access log service)` supported by this receiver. OAL script support all GAUGE type metrics.
1. **receiver_zipkin**. See [details](#zipkin-receiver).
1. **receiver_jaeger**. See [details](#jaeger-receiver).
1. **receiver-profile**. gRPC services accept profile task status and snapshot reporter.
The sample settings of these receivers should be already in default `application.yml`, and also list here
```yaml
......@@ -42,6 +43,8 @@ receiver_zipkin:
host: 0.0.0.0
port: 9411
contextPath: /
receiver-profile:
default:
```
## gRPC/HTTP server for receiver
......
......@@ -90,6 +90,7 @@ property key | Description | Default |
`collector.app_and_service_register_check_interval`|application and service registry check interval.|`3`|
`collector.backend_service`|Collector SkyWalking trace receiver service addresses.|`127.0.0.1:11800`|
`collector.grpc_upstream_timeout`|How long grpc client will timeout in sending data to upstream. Unit is second.|`30` seconds|
`collector.get_profile_task_interval`|Sniffer get profile task list interval.|`20`|
`logging.level`|The log level. Default is debug.|`DEBUG`|
`logging.file_name`|Log file name.|`skywalking-api.log`|
`logging.output`| Log output. Default is FILE. Use CONSOLE means output to stdout. |`FILE`|
......@@ -102,6 +103,7 @@ property key | Description | Default |
`buffer.buffer_size`|The buffer size.|`300`|
`dictionary.service_code_buffer_size`|The buffer size of application codes and peer|`10 * 10000`|
`dictionary.endpoint_name_buffer_size`|The buffer size of endpoint names and peer|`1000 * 10000`|
`profile.active`|If true, skywalking agent will enable profile when user create a new profile task. Otherwise disable profile.|`true`|
`plugin.peer_max_length `|Peer maximum description limit.|`200`|
`plugin.mongodb.trace_param`|If true, trace all the parameters in MongoDB access, default is false. Only trace the operation, not include parameters.|`false`|
`plugin.mongodb.filter_length_limit`|If set to positive number, the `WriteRequest.params` would be truncated to this length, otherwise it would be completely saved, which may cause performance problem.|`256`|
......
......@@ -116,6 +116,11 @@
<artifactId>skywalking-so11y-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-profile-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- receiver module -->
<!-- storage module -->
......
......@@ -100,6 +100,8 @@ receiver-clr:
default:
#receiver-so11y:
# default:
receiver-profile:
default:
service-mesh:
default:
bufferPath: ${SW_SERVICE_MESH_BUFFER_PATH:../mesh-buffer/} # Path to trace buffer files, suggest to use absolute path
......
......@@ -20,10 +20,8 @@ package org.apache.skywalking.oap.server.core;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.command.CommandService;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
......@@ -82,6 +80,7 @@ public class CoreModule extends ModuleDefine {
private void addProfileService(List<Class> classes) {
classes.add(ProfileTaskMutationService.class);
classes.add(ProfileTaskQueryService.class);
classes.add(ProfileTaskCache.class);
}
private void addQueryService(List<Class> classes) {
......
......@@ -67,6 +67,11 @@ public class CoreModuleConfig extends ModuleConfig {
private long maxSizeOfEndpointInventory = 1_000_000L;
private long maxSizeOfNetworkInventory = 1_000_000L;
/**
* Following are cache setting for none stream(s)
*/
private long maxSizeOfProfileTask = 10_000L;
CoreModuleConfig() {
this.downsampling = new ArrayList<>();
}
......
......@@ -170,6 +170,7 @@ public class CoreModuleProvider extends ModuleProvider {
// add profile service implementations
this.registerServiceImplementation(ProfileTaskMutationService.class, new ProfileTaskMutationService(getManager()));
this.registerServiceImplementation(ProfileTaskQueryService.class, new ProfileTaskQueryService(getManager()));
this.registerServiceImplementation(ProfileTaskCache.class, new ProfileTaskCache(getManager(), moduleConfig));
this.registerServiceImplementation(CommandService.class, new CommandService(getManager()));
......
......@@ -18,13 +18,18 @@
package org.apache.skywalking.oap.server.core.cache;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.*;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
......@@ -56,6 +61,7 @@ public enum CacheUpdateTimer {
updateServiceInventory(moduleDefineHolder);
updateServiceInstanceInventory(moduleDefineHolder);
updateNetAddressInventory(moduleDefineHolder);
updateProfileTask(moduleDefineHolder);
}
private void updateServiceInventory(ModuleDefineHolder moduleDefineHolder) {
......@@ -115,4 +121,25 @@ public enum CacheUpdateTimer {
}
});
}
/**
* update all profile task list for each service
* @param moduleDefineHolder
*/
private void updateProfileTask(ModuleDefineHolder moduleDefineHolder) {
IProfileTaskQueryDAO profileTaskQueryDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IProfileTaskQueryDAO.class);
ProfileTaskCache profileTaskCache = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ProfileTaskCache.class);
try {
final List<ProfileTask> taskList = profileTaskQueryDAO.getTaskList(null, null, profileTaskCache.getCacheStartTimeBucket(), profileTaskCache.getCacheEndTimeBucket(), null);
taskList.stream().collect(Collectors.groupingBy(t -> t.getServiceId())).entrySet().stream().forEach(e -> {
final Integer serviceId = e.getKey();
final List<ProfileTask> profileTasks = e.getValue();
profileTaskCache.saveTaskList(serviceId, profileTasks);
});
} catch (IOException e) {
logger.warn("Unable to update profile task cache", e);
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* cache need to execute profile task
*
* @author MrPro
*/
public class ProfileTaskCache implements Service {
private static final Logger LOGGER = LoggerFactory.getLogger(ProfileTaskCache.class);
private final Cache<Integer, List<ProfileTask>> profileTaskCache;
private final ModuleManager moduleManager;
public ProfileTaskCache(ModuleManager moduleManager, CoreModuleConfig moduleConfig) {
this.moduleManager = moduleManager;
long initialSize = moduleConfig.getMaxSizeOfProfileTask() / 10L;
int initialCapacitySize = (int)(initialSize > Integer.MAX_VALUE ? Integer.MAX_VALUE : initialSize);
profileTaskCache = CacheBuilder.newBuilder().initialCapacity(initialCapacitySize).maximumSize(moduleConfig.getMaxSizeOfProfileTask())
// remove old profile task data
.expireAfterWrite(Duration.ofMinutes(1)).build();
}
/**
* query executable profile task
* @param serviceId
* @return
*/
public List<ProfileTask> getProfileTaskList(int serviceId) {
// read profile task list from cache only, use cache update timer mechanism
List<ProfileTask> profileTaskList = profileTaskCache.getIfPresent(serviceId);
return profileTaskList;
}
/**
* save service task list
* @param serviceId
* @param taskList
*/
public void saveTaskList(int serviceId, List<ProfileTask> taskList) {
if (taskList == null) {
taskList = Collections.emptyList();
}
profileTaskCache.put(serviceId, taskList);
}
/**
* use for every db query
* @return
*/
public long getCacheStartTimeBucket() {
return TimeBucket.getRecordTimeBucket(System.currentTimeMillis());
}
/**
* use for every db query, +10 start time and +15 end time(because use task end time to search)
* @return
*/
public long getCacheEndTimeBucket() {
return TimeBucket.getRecordTimeBucket(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(25));
}
}
......@@ -18,7 +18,9 @@
package org.apache.skywalking.oap.server.core.command;
import org.apache.skywalking.apm.network.trace.component.command.ProfileTaskCommand;
import org.apache.skywalking.apm.network.trace.component.command.ServiceResetCommand;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
......@@ -39,6 +41,11 @@ public class CommandService implements Service {
return new ServiceResetCommand(serialNumber);
}
public ProfileTaskCommand newProfileTaskCommand(ProfileTask task) {
final String serialNumber = UUID.randomUUID().toString();
return new ProfileTaskCommand(serialNumber, task.getEndpointName(), task.getDuration(), task.getMinDurationThreshold(), task.getDumpPeriod(), task.getMaxSamplingCount(), task.getStartTime(), task.getCreateTime());
}
private String generateSerialNumber(final int serviceInstanceId, final long time, final String serviceInstanceUUID) {
return UUID.randomUUID().toString(); // Simply generate a uuid without taking care of the parameters
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.profile;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import java.util.HashMap;
import java.util.Map;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK_LOG;
/**
* profile task log database bean, use record
*
* @author MrPro
*/
@Getter
@Setter
@ScopeDeclaration(id = PROFILE_TASK_LOG, name = "ProfileTaskLog")
@Stream(name = ProfileTaskLogRecord.INDEX_NAME, scopeId = PROFILE_TASK_LOG, builder = ProfileTaskLogRecord.Builder.class, processor = RecordStreamProcessor.class)
public class ProfileTaskLogRecord extends Record {
public static final String INDEX_NAME = "profile_task_log";
public static final String TASK_ID = "task_id";
public static final String INSTANCE_ID = "instance_id";
public static final String OPERATION_TYPE = "operation_type";
public static final String OPERATION_TIME = "operation_time";
@Column(columnName = TASK_ID) private String taskId;
@Column(columnName = INSTANCE_ID) private int instanceId;
@Column(columnName = OPERATION_TYPE) private int operationType;
@Column(columnName = OPERATION_TIME) private long operationTime;
@Override
public String id() {
return getTaskId() + Const.ID_SPLIT + getInstanceId() + Const.ID_SPLIT + getOperationType() + Const.ID_SPLIT + getOperationTime();
}
public static class Builder implements StorageBuilder<ProfileTaskLogRecord> {
@Override
public ProfileTaskLogRecord map2Data(Map<String, Object> dbMap) {
final ProfileTaskLogRecord log = new ProfileTaskLogRecord();
log.setTaskId((String)dbMap.get(TASK_ID));
log.setInstanceId(((Number)dbMap.get(INSTANCE_ID)).intValue());
log.setOperationType(((Number)dbMap.get(OPERATION_TYPE)).intValue());
log.setOperationTime(((Number)dbMap.get(OPERATION_TIME)).longValue());
log.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
return log;
}
@Override
public Map<String, Object> data2Map(ProfileTaskLogRecord storageData) {
final HashMap<String, Object> map = new HashMap<>();
map.put(TASK_ID, storageData.getTaskId());
map.put(INSTANCE_ID, storageData.getInstanceId());
map.put(OPERATION_TYPE, storageData.getOperationType());
map.put(OPERATION_TIME, storageData.getOperationTime());
map.put(TIME_BUCKET, storageData.getTimeBucket());
return map;
}
}
}
......@@ -17,6 +17,7 @@
*/
package org.apache.skywalking.oap.server.core.profile;
import org.apache.skywalking.apm.network.constants.ProfileConstants;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.Downsampling;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
......@@ -60,17 +61,18 @@ public class ProfileTaskMutationService implements Service {
* @param monitorDuration monitor task duration(minute)
* @param minDurationThreshold min duration threshold
* @param dumpPeriod dump period
* @param maxSamplingCount max trace count on sniffer
* @return task create result
*/
public ProfileTaskCreationResult createTask(final int serviceId, final String endpointName, final long monitorStartTime, final int monitorDuration,
final int minDurationThreshold, final int dumpPeriod) throws IOException {
final int minDurationThreshold, final int dumpPeriod, final int maxSamplingCount) throws IOException {
// calculate task execute range
long taskStartTime = monitorStartTime > 0 ? monitorStartTime : System.currentTimeMillis();
long taskEndTime = taskStartTime + TimeUnit.MINUTES.toMillis(monitorDuration);
// check data
final String errorMessage = checkDataSuccess(serviceId, endpointName, taskStartTime, taskEndTime, monitorDuration, minDurationThreshold, dumpPeriod);
final String errorMessage = checkDataSuccess(serviceId, endpointName, taskStartTime, taskEndTime, monitorDuration, minDurationThreshold, dumpPeriod, maxSamplingCount);
if (errorMessage != null) {
return ProfileTaskCreationResult.builder().errorReason(errorMessage).build();
}
......@@ -85,6 +87,7 @@ public class ProfileTaskMutationService implements Service {
task.setMinDurationThreshold(minDurationThreshold);
task.setDumpPeriod(dumpPeriod);
task.setCreateTime(createTime);
task.setMaxSamplingCount(maxSamplingCount);
task.setTimeBucket(TimeBucket.getRecordTimeBucket(taskEndTime));
NoneStreamingProcessor.getInstance().in(task);
......@@ -92,7 +95,7 @@ public class ProfileTaskMutationService implements Service {
}
private String checkDataSuccess(final Integer serviceId, final String endpointName, final long monitorStartTime, final long monitorEndTime, final int monitorDuration,
final int minDurationThreshold, final int dumpPeriod) throws IOException {
final int minDurationThreshold, final int dumpPeriod, final int maxSamplingCount) throws IOException {
// basic check
if (serviceId == null) {
return "service cannot be null";
......@@ -100,23 +103,27 @@ public class ProfileTaskMutationService implements Service {
if (StringUtil.isEmpty(endpointName)) {
return "endpoint name cannot be empty";
}
if (monitorEndTime - monitorStartTime < TimeUnit.MINUTES.toMillis(1)) {
return "monitor duration must greater than 1 minutes";
if (monitorDuration < ProfileConstants.TASK_DURATION_MIN_MINUTE) {
return "monitor duration must greater than " + ProfileConstants.TASK_DURATION_MIN_MINUTE + " minutes";
}
if (minDurationThreshold < 0) {
return "min duration threshold must greater than or equals zero";
}
if (maxSamplingCount <= 0) {
return "max sampling count must greater than zero";
}
// check limit
// The duration of the monitoring task cannot be greater than 15 minutes
final long maxMonitorDurationInSec = TimeUnit.MINUTES.toSeconds(15);
if (monitorDuration > maxMonitorDurationInSec) {
return "The duration of the monitoring task cannot be greater than 15 minutes";
if (monitorDuration > ProfileConstants.TASK_DURATION_MAX_MINUTE) {
return "The duration of the monitoring task cannot be greater than " + ProfileConstants.TASK_DURATION_MAX_MINUTE + " minutes";
}
if (dumpPeriod < ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS) {
return "dump period must be greater than or equals " + ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS + " milliseconds";
}
// dump period must be greater than or equals 10 milliseconds
if (dumpPeriod < 10) {
return "dump period must be greater than or equals 10 milliseconds";
if (maxSamplingCount >= ProfileConstants.TASK_MAX_SAMPLING_COUNT) {
return "max sampling count must less than " + ProfileConstants.TASK_MAX_SAMPLING_COUNT;
}
// Each service can monitor up to 1 endpoints during the execution of tasks
......
......@@ -52,6 +52,7 @@ public class ProfileTaskNoneStream extends NoneStream {
public static final String MIN_DURATION_THRESHOLD = "min_duration_threshold";
public static final String DUMP_PERIOD = "dump_period";
public static final String CREATE_TIME = "create_time";
public static final String MAX_SAMPLING_COUNT = "max_sampling_count";
@Override
public String id() {
......@@ -65,6 +66,7 @@ public class ProfileTaskNoneStream extends NoneStream {
@Column(columnName = MIN_DURATION_THRESHOLD) private int minDurationThreshold;
@Column(columnName = DUMP_PERIOD) private int dumpPeriod;
@Column(columnName = CREATE_TIME) private long createTime;
@Column(columnName = MAX_SAMPLING_COUNT) private int maxSamplingCount;
public static class Builder implements StorageBuilder<ProfileTaskNoneStream> {
......@@ -79,6 +81,7 @@ public class ProfileTaskNoneStream extends NoneStream {
record.setDumpPeriod(((Number)dbMap.get(DUMP_PERIOD)).intValue());
record.setCreateTime(((Number)dbMap.get(CREATE_TIME)).longValue());
record.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
record.setMaxSamplingCount(((Number)dbMap.get(MAX_SAMPLING_COUNT)).intValue());
return record;
}
......@@ -93,6 +96,7 @@ public class ProfileTaskNoneStream extends NoneStream {
map.put(DUMP_PERIOD, storageData.getDumpPeriod());
map.put(CREATE_TIME, storageData.getCreateTime());
map.put(TIME_BUCKET, storageData.getTimeBucket());
map.put(MAX_SAMPLING_COUNT, storageData.getMaxSamplingCount());
return map;
}
}
......
......@@ -17,18 +17,25 @@
*/
package org.apache.skywalking.oap.server.core.query;
import com.google.common.base.Objects;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLog;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static java.util.Objects.isNull;
......@@ -40,14 +47,16 @@ import static java.util.Objects.isNull;
public class ProfileTaskQueryService implements Service {
private final ModuleManager moduleManager;
private IProfileTaskQueryDAO profileTaskQueryDAO;
private IProfileTaskLogQueryDAO profileTaskLogQueryDAO;
private ServiceInventoryCache serviceInventoryCache;
private ServiceInstanceInventoryCache serviceInstanceInventoryCache;
public ProfileTaskQueryService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private IProfileTaskQueryDAO getProfileTaskDAO() {
if (profileTaskQueryDAO == null) {
if (isNull(profileTaskQueryDAO)) {
this.profileTaskQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(IProfileTaskQueryDAO.class);
}
return profileTaskQueryDAO;
......@@ -60,6 +69,20 @@ public class ProfileTaskQueryService implements Service {
return serviceInventoryCache;
}
public IProfileTaskLogQueryDAO getProfileTaskLogQueryDAO() {
if (isNull(profileTaskLogQueryDAO)) {
profileTaskLogQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(IProfileTaskLogQueryDAO.class);
}
return profileTaskLogQueryDAO;
}
public ServiceInstanceInventoryCache getServiceInstanceInventoryCache() {
if (isNull(serviceInstanceInventoryCache)) {
serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
}
return serviceInstanceInventoryCache;
}
/**
* search profile task list
* @param serviceId monitor service
......@@ -69,14 +92,31 @@ public class ProfileTaskQueryService implements Service {
public List<ProfileTask> getTaskList(Integer serviceId, String endpointName) throws IOException {
final List<ProfileTask> tasks = getProfileTaskDAO().getTaskList(serviceId, endpointName, null, null, null);
// query all and filter on task to match logs
List<ProfileTaskLog> taskLogList = getProfileTaskLogQueryDAO().getTaskLogList(null);
if (taskLogList == null) {
taskLogList = Collections.emptyList();
}
// add service name
if (CollectionUtils.isNotEmpty(tasks)) {
final ServiceInventoryCache serviceInventoryCache = getServiceInventoryCache();
final ServiceInstanceInventoryCache serviceInstanceInventoryCache = getServiceInstanceInventoryCache();
for (ProfileTask task : tasks) {
final ServiceInventory serviceInventory = serviceInventoryCache.get(task.getServiceId());
if (serviceInventory != null) {
task.setServiceName(serviceInventory.getName());
}
// filter all task logs
task.setLogs(taskLogList.stream().filter(l -> Objects.equal(l.getTaskId(), task.getId())).map(l -> {
// get instance name from cache
final ServiceInstanceInventory instanceInventory = serviceInstanceInventoryCache.get(l.getInstanceId());
if (instanceInventory != null) {
l.setInstanceName(instanceInventory.getName());
}
return l;
}).collect(Collectors.toList()));
}
}
......
......@@ -20,6 +20,8 @@ package org.apache.skywalking.oap.server.core.query.entity;
import lombok.*;
import java.util.List;
/**
* @author MrPro
*/
......@@ -35,8 +37,12 @@ public class ProfileTask {
private String serviceName;
private String endpointName;
private long startTime;
private long createTime;
private int duration;
private int minDurationThreshold;
private int dumpPeriod;
private int maxSamplingCount;
private List<ProfileTaskLog> logs;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.query.entity;
import lombok.*;
/**
* Profile task execute log
*
* @author MrPro
*/
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class ProfileTaskLog {
private String id;
private String taskId;
// instance
private int instanceId;
private String instanceName;
// operation
private ProfileTaskLogOperationType operationType;
private long operationTime;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.query.entity;
import java.util.HashMap;
import java.util.Map;
/**
* Profile task log operation type
*
* @author MrPro
*/
public enum ProfileTaskLogOperationType {
// when sniffer has notified
NOTIFIED(1),
// when sniffer has execution finished to report
EXECUTION_FINISHED(2);
private int code;
private static final Map<Integer, ProfileTaskLogOperationType> CACHE = new HashMap<Integer, ProfileTaskLogOperationType>();
static {
for (ProfileTaskLogOperationType val :ProfileTaskLogOperationType.values()) {
CACHE.put(val.getCode(), val);
}
}
/**
* Parse opetation type by code
* @param code
* @return
*/
public static ProfileTaskLogOperationType parse(int code) {
return CACHE.get(code);
}
ProfileTaskLogOperationType(int code) {
this.code = code;
}
public int getCode() {
return this.code;
}
}
......@@ -68,6 +68,7 @@ public class DefaultScopeDefine {
public static final int JAEGER_SPAN = 24;
public static final int HTTP_ACCESS_LOG = 25;
public static final int PROFILE_TASK = 26;
public static final int PROFILE_TASK_LOG = 27;
/**
* Catalog of scope, the metrics processor could use this to group all generated metrics by oal rt.
......
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.core.storage.cache.*;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.library.module.*;
......@@ -41,6 +42,6 @@ public class StorageModule extends ModuleDefine {
IServiceInventoryCacheDAO.class, IServiceInstanceInventoryCacheDAO.class,
IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class,
ITopologyQueryDAO.class, IMetricsQueryDAO.class, ITraceQueryDAO.class, IMetadataQueryDAO.class, IAggregationQueryDAO.class, IAlarmQueryDAO.class,
ITopNRecordsQueryDAO.class, ILogQueryDAO.class, IProfileTaskQueryDAO.class};
ITopNRecordsQueryDAO.class, ILogQueryDAO.class, IProfileTaskQueryDAO.class, IProfileTaskLogQueryDAO.class};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.profile;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLog;
import org.apache.skywalking.oap.server.core.storage.DAO;
import java.io.IOException;
import java.util.List;
/**
* process all profile task log query
*
* @author MrPro
*/
public interface IProfileTaskLogQueryDAO extends DAO {
/**
* search all task log list in appoint profile task id
* @param taskId profile task id, maybe null
* @return
*/
List<ProfileTaskLog> getTaskLogList(final String taskId) throws IOException;
}
......@@ -66,6 +66,8 @@ receiver-trace:
bufferFileCleanWhenRestart: ${RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
receiver-jvm:
default:
receiver-profile:
default:
service-mesh:
default:
bufferPath: ${SERVICE_MESH_BUFFER_PATH:../mesh-buffer/} # Path to trace buffer files, suggest to use absolute path
......
......@@ -54,7 +54,8 @@ public class ProfileMutation implements GraphQLMutationResolver {
creationRequest.getStartTime() == null ? -1 : creationRequest.getStartTime(),
creationRequest.getDuration(),
creationRequest.getMinDurationThreshold(),
creationRequest.getDumpPeriod()
creationRequest.getDumpPeriod(),
creationRequest.getMaxSamplingCount()
);
}
}
......@@ -37,5 +37,6 @@ public class ProfileTaskCreationRequest {
private Step durationUnit;
private int minDurationThreshold;
private int dumpPeriod;
private int maxSamplingCount;
}
Subproject commit a9b48130626e4b4dcf46bb8268c7125cc6f50814
Subproject commit dde9a0dad56617ccbf4226f5f71e667fd9620222
......@@ -40,6 +40,7 @@
<module>jaeger-receiver-plugin</module>
<module>receiver-proto</module>
<module>skywalking-so11y-receiver-plugin</module>
<module>skywalking-profile-receiver-plugin</module>
</modules>
<dependencies>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-receiver-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>7.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>skywalking-profile-receiver-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-sharing-server-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.profile.module;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
* profile task receiver
*
* @author MrPro
*/
public class ProfileModule extends ModuleDefine {
public static final String NAME = "receiver-profile";
public ProfileModule() {
super(NAME);
}
@Override
public Class[] services() {
return new Class[0];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.profile.provider;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.receiver.profile.module.ProfileModule;
import org.apache.skywalking.oap.server.receiver.profile.provider.handler.ProfileTaskServiceHandler;
import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
/**
* profile task receiver default provider
*
* @author MrPro
*/
public class ProfileModuleProvider extends ModuleProvider {
@Override
public String name() {
return "default";
}
@Override
public Class<? extends ModuleDefine> module() {
return ProfileModule.class;
}
@Override
public ModuleConfig createConfigBeanIfAbsent() {
return null;
}
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(GRPCHandlerRegister.class);
grpcHandlerRegister.addHandler(new ProfileTaskServiceHandler(getManager()));
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
}
@Override
public String[] requiredModules() {
return new String[] {CoreModule.NAME, SharingServerModule.NAME};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.profile.provider.handler;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.network.common.Commands;
import org.apache.skywalking.apm.network.language.profile.ProfileTaskCommandQuery;
import org.apache.skywalking.apm.network.language.profile.ProfileTaskGrpc;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache;
import org.apache.skywalking.oap.server.core.command.CommandService;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLogOperationType;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author MrPro
*/
public class ProfileTaskServiceHandler extends ProfileTaskGrpc.ProfileTaskImplBase implements GRPCHandler {
private ProfileTaskCache profileTaskCache;
private final CommandService commandService;
public ProfileTaskServiceHandler(ModuleManager moduleManager) {
this.profileTaskCache = moduleManager.find(CoreModule.NAME).provider().getService(ProfileTaskCache.class);
this.commandService = moduleManager.find(CoreModule.NAME).provider().getService(CommandService.class);
}
@Override
public void getProfileTaskCommands(ProfileTaskCommandQuery request, StreamObserver<Commands> responseObserver) {
// query profile task list by service id
final List<ProfileTask> profileTaskList = profileTaskCache.getProfileTaskList(request.getServiceId());
if (CollectionUtils.isEmpty(profileTaskList)) {
responseObserver.onNext(Commands.newBuilder().build());
responseObserver.onCompleted();
return;
}
// build command list
final Commands.Builder commandsBuilder = Commands.newBuilder();
final long lastCommandTime = request.getLastCommandTime();
for (ProfileTask profileTask : profileTaskList) {
// if command create time less than last command time, means sniffer already have task
if (profileTask.getCreateTime() <= lastCommandTime) {
continue;
}
// record profile task log
recordProfileTaskLog(profileTask, request);
// add command
commandsBuilder.addCommands(commandService.newProfileTaskCommand(profileTask).serialize().build());
}
responseObserver.onNext(commandsBuilder.build());
responseObserver.onCompleted();
}
private void recordProfileTaskLog(ProfileTask task, ProfileTaskCommandQuery query) {
final ProfileTaskLogRecord logRecord = new ProfileTaskLogRecord();
logRecord.setTaskId(task.getId());
logRecord.setInstanceId(query.getInstanceId());
logRecord.setOperationType(ProfileTaskLogOperationType.NOTIFIED.getCode());
logRecord.setOperationTime(System.currentTimeMillis());
// same with task time bucket, ensure record will ttl same with profile task
logRecord.setTimeBucket(TimeBucket.getRecordTimeBucket(task.getStartTime() + TimeUnit.MINUTES.toMillis(task.getDuration())));
RecordStreamProcessor.getInstance().in(logRecord);
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.receiver.profile.module.ProfileModule
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.receiver.profile.provider.ProfileModuleProvider
......@@ -36,6 +36,7 @@ import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCac
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
......@@ -119,6 +120,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(elasticSearchClient, config.getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(elasticSearchClient, config.getProfileTaskQueryMaxSize()));
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLog;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLogOperationType;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
/**
* @author MrPro
*/
public class ProfileTaskLogEsDAO extends EsDAO implements IProfileTaskLogQueryDAO {
private final int queryMaxSize;
public ProfileTaskLogEsDAO(ElasticSearchClient client, int profileTaskQueryMaxSize) {
super(client);
// query log size use profile task query max size * per log count
this.queryMaxSize = profileTaskQueryMaxSize * 50;
}
@Override
public List<ProfileTaskLog> getTaskLogList(String taskId) throws IOException {
final SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
final BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
sourceBuilder.query(boolQueryBuilder);
if (taskId != null) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(ProfileTaskLogRecord.TASK_ID, taskId));
}
sourceBuilder.sort(ProfileTaskLogRecord.OPERATION_TIME, SortOrder.DESC);
sourceBuilder.size(queryMaxSize);
final SearchResponse response = getClient().search(ProfileTaskLogRecord.INDEX_NAME, sourceBuilder);
final LinkedList<ProfileTaskLog> tasks = new LinkedList<>();
for (SearchHit searchHit : response.getHits().getHits()) {
tasks.add(parseTaskLog(searchHit));
}
return tasks;
}
private ProfileTaskLog parseTaskLog(SearchHit data) {
return ProfileTaskLog.builder()
.id(data.getId())
.taskId((String) data.getSourceAsMap().get(ProfileTaskLogRecord.TASK_ID))
.instanceId(((Number) data.getSourceAsMap().get(ProfileTaskLogRecord.INSTANCE_ID)).intValue())
.operationType(ProfileTaskLogOperationType.parse(((Number) data.getSourceAsMap().get(ProfileTaskLogRecord.OPERATION_TYPE)).intValue()))
.operationTime(((Number) data.getSourceAsMap().get(ProfileTaskLogRecord.OPERATION_TIME)).longValue()).build();
}
}
......@@ -94,8 +94,10 @@ public class ProfileTaskQueryEsDAO extends EsDAO implements IProfileTaskQueryDAO
.serviceId(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.SERVICE_ID)).intValue())
.endpointName((String) data.getSourceAsMap().get(ProfileTaskNoneStream.ENDPOINT_NAME))
.startTime(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.START_TIME)).longValue())
.createTime(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.CREATE_TIME)).longValue())
.duration(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.DURATION)).intValue())
.minDurationThreshold(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.MIN_DURATION_THRESHOLD)).intValue())
.dumpPeriod(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.DUMP_PERIOD)).intValue()).build();
.dumpPeriod(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.DUMP_PERIOD)).intValue())
.maxSamplingCount(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.MAX_SAMPLING_COUNT)).intValue()).build();
}
}
......@@ -31,6 +31,7 @@ import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCac
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
......@@ -47,6 +48,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.HistoryDeleteEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskLogEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopNRecordsQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO;
......@@ -124,6 +126,7 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider {
this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(elasticSearch7Client, config.getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(elasticSearch7Client, config.getProfileTaskQueryMaxSize()));
}
@Override
......
......@@ -22,6 +22,7 @@ import java.util.Properties;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.cache.*;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.core.storage.ttl.GeneralStorageTTL;
......@@ -92,6 +93,7 @@ public class H2StorageProvider extends ModuleProvider {
this.registerServiceImplementation(ILogQueryDAO.class, new H2LogQueryDAO(h2Client));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new H2ProfileTaskQueryDAO(h2Client));
this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new H2ProfileTaskLogQueryDAO(h2Client));
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLog;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLogOperationType;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
/**
* @author MrPro
*/
public class H2ProfileTaskLogQueryDAO implements IProfileTaskLogQueryDAO {
private JDBCHikariCPClient h2Client;
public H2ProfileTaskLogQueryDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
}
@Override
public List<ProfileTaskLog> getTaskLogList(String taskId) throws IOException {
final StringBuilder sql = new StringBuilder();
final ArrayList<Object> condition = new ArrayList<>(1);
sql.append("select * from ").append(ProfileTaskLogRecord.INDEX_NAME).append(" where 1=1 ");
if (taskId != null) {
sql.append(" and ").append(ProfileTaskLogRecord.TASK_ID).append(" = ?");
}
sql.append("ORDER BY ").append(ProfileTaskLogRecord.OPERATION_TIME).append(" DESC ");
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]))) {
final LinkedList<ProfileTaskLog> tasks = new LinkedList<>();
while (resultSet.next()) {
tasks.add(parseLog(resultSet));
}
return tasks;
}
} catch (SQLException | JDBCClientException e) {
throw new IOException(e);
}
}
private ProfileTaskLog parseLog(ResultSet data) throws SQLException {
return ProfileTaskLog.builder()
.id(data.getString("id"))
.taskId(data.getString(ProfileTaskLogRecord.TASK_ID))
.instanceId(data.getInt(ProfileTaskLogRecord.INSTANCE_ID))
.operationType(ProfileTaskLogOperationType.parse(data.getInt(ProfileTaskLogRecord.OPERATION_TYPE)))
.operationTime(data.getLong(ProfileTaskLogRecord.OPERATION_TIME)).build();
}
}
......@@ -98,8 +98,10 @@ public class H2ProfileTaskQueryDAO implements IProfileTaskQueryDAO {
.serviceId(data.getInt(ProfileTaskNoneStream.SERVICE_ID))
.endpointName(data.getString(ProfileTaskNoneStream.ENDPOINT_NAME))
.startTime(data.getLong(ProfileTaskNoneStream.START_TIME))
.createTime(data.getLong(ProfileTaskNoneStream.CREATE_TIME))
.duration(data.getInt(ProfileTaskNoneStream.DURATION))
.minDurationThreshold(data.getInt(ProfileTaskNoneStream.MIN_DURATION_THRESHOLD))
.dumpPeriod(data.getInt(ProfileTaskNoneStream.DUMP_PERIOD)).build();
.dumpPeriod(data.getInt(ProfileTaskNoneStream.DUMP_PERIOD))
.maxSamplingCount(data.getInt(ProfileTaskNoneStream.MAX_SAMPLING_COUNT)).build();
}
}
......@@ -29,6 +29,7 @@ import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCac
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
......@@ -107,6 +108,7 @@ public class MySQLStorageProvider extends ModuleProvider {
this.registerServiceImplementation(ILogQueryDAO.class, new MySQLLogQueryDAO(mysqlClient));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new H2ProfileTaskQueryDAO(mysqlClient));
this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new H2ProfileTaskLogQueryDAO(mysqlClient));
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
......
......@@ -126,6 +126,8 @@ receiver-clr:
default:
#receiver-so11y:
# default:
receiver-profile:
default:
service-mesh:
default:
bufferPath: ${SW_SERVICE_MESH_BUFFER_PATH:../mesh-buffer/} # Path to trace buffer files, suggest to use absolute path
......
......@@ -116,6 +116,7 @@
</INSTRUMENTED_SERVICE_1>
<INSTRUMENTED_SERVICE_1_OPTS>
-DSW_AGENT_COLLECTOR_BACKEND_SERVICES=127.0.0.1:11800
-DSW_AGENT_PROFILE_ACTIVE=true
-DSW_AGENT_NAME=${provider.name}
-Dserver.port=9090
</INSTRUMENTED_SERVICE_1_OPTS>
......
......@@ -55,6 +55,7 @@ import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
......@@ -66,7 +67,7 @@ public class ProfileVerificationITCase {
private static final Logger LOGGER = LoggerFactory.getLogger(ProfileVerificationITCase.class);
private final RestTemplate restTemplate = new RestTemplate();
private final int retryInterval = 30;
private final int retryInterval = 10;
private ProfileClient profileClient;
private String instrumentedServiceUrl;
......@@ -74,10 +75,11 @@ public class ProfileVerificationITCase {
@Before
public void setUp() {
final String swWebappHost = System.getProperty("sw.webapp.host", "127.0.0.1");
// final String swWebappPort = System.getProperty("sw.webapp.port", "32783");
// final String swWebappPort = System.getProperty("sw.webapp.port", "32783");
final String swWebappPort = System.getProperty("sw.webapp.port", "12800");
final String instrumentedServiceHost = System.getProperty("service.host", "127.0.0.1");
final String instrumentedServicePort = System.getProperty("service.port", "32782");
// final String instrumentedServicePort = System.getProperty("service.port", "9090");
profileClient = new ProfileClient(swWebappHost, swWebappPort);
instrumentedServiceUrl = "http://" + instrumentedServiceHost + ":" + instrumentedServicePort;
}
......@@ -135,7 +137,8 @@ public class ProfileVerificationITCase {
.duration(5)
.startTime(-1)
.minDurationThreshold(10)
.dumpPeriod(10).build();
.dumpPeriod(10)
.maxSamplingCount(5).build();
// verify create task
final ProfileTaskCreationResult creationResult = profileClient.createProfileTask(creationRequest);
......@@ -144,19 +147,30 @@ public class ProfileVerificationITCase {
ProfileTaskCreationResultMatcher creationResultMatcher = new ProfileTaskCreationResultMatcher();
creationResultMatcher.verify(creationResult);
// verify get task list
final ProfileTasks tasks = profileClient.getProfileTaskList(
new ProfileTaskQuery()
.serviceId(creationRequest.getServiceId())
.endpointName("")
);
LOGGER.info("get profile task list: {}", tasks);
// verify get task list and logs
for (int i = 0; i < 10; i++) {
try {
final ProfileTasks tasks = profileClient.getProfileTaskList(
new ProfileTaskQuery()
.serviceId(creationRequest.getServiceId())
.endpointName("")
);
LOGGER.info("get profile task list: {}", tasks);
InputStream expectedInputStream =
new ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
InputStream expectedInputStream =
new ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
final ProfilesTasksMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
servicesMatcher.verify(tasks);
break;
} catch (Throwable e) {
if (i == 10 - 1) {
throw new IllegalStateException("match profile task list fail!", e);
}
TimeUnit.SECONDS.sleep(retryInterval);
}
}
final ProfilesTasksMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
servicesMatcher.verify(tasks);
}
private void verifyServices(LocalDateTime minutesAgo) throws Exception {
......
......@@ -75,6 +75,7 @@
</INSTRUMENTED_SERVICE_1>
<INSTRUMENTED_SERVICE_1_OPTS>
-DSW_AGENT_COLLECTOR_BACKEND_SERVICES=127.0.0.1:11800
-DSW_AGENT_PROFILE_ACTIVE=true
-DSW_AGENT_NAME=${provider.name}
-Dserver.port=9090
</INSTRUMENTED_SERVICE_1_OPTS>
......
......@@ -55,6 +55,7 @@ import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
......@@ -66,7 +67,7 @@ public class ProfileVerificationITCase {
private static final Logger LOGGER = LoggerFactory.getLogger(ProfileVerificationITCase.class);
private final RestTemplate restTemplate = new RestTemplate();
private final int retryInterval = 30;
private final int retryInterval = 10;
private ProfileClient profileClient;
private String instrumentedServiceUrl;
......@@ -78,6 +79,7 @@ public class ProfileVerificationITCase {
final String swWebappPort = System.getProperty("sw.webapp.port", "12800");
final String instrumentedServiceHost = System.getProperty("service.host", "127.0.0.1");
final String instrumentedServicePort = System.getProperty("service.port", "32782");
// final String instrumentedServicePort = System.getProperty("service.port", "9090");
profileClient = new ProfileClient(swWebappHost, swWebappPort);
instrumentedServiceUrl = "http://" + instrumentedServiceHost + ":" + instrumentedServicePort;
}
......@@ -134,7 +136,8 @@ public class ProfileVerificationITCase {
.duration(5)
.startTime(-1)
.minDurationThreshold(10)
.dumpPeriod(10).build();
.dumpPeriod(10)
.maxSamplingCount(5).build();
// verify create task
final ProfileTaskCreationResult creationResult = profileClient.createProfileTask(creationRequest);
......@@ -143,19 +146,30 @@ public class ProfileVerificationITCase {
ProfileTaskCreationResultMatcher creationResultMatcher = new ProfileTaskCreationResultMatcher();
creationResultMatcher.verify(creationResult);
// verify get task list
final ProfileTasks tasks = profileClient.getProfileTaskList(
new ProfileTaskQuery()
.serviceId(creationRequest.getServiceId())
.endpointName("")
);
LOGGER.info("get profile task list: {}", tasks);
// verify get task list and logs
for (int i = 0; i < 10; i++) {
try {
final ProfileTasks tasks = profileClient.getProfileTaskList(
new ProfileTaskQuery()
.serviceId(creationRequest.getServiceId())
.endpointName("")
);
LOGGER.info("get profile task list: {}", tasks);
InputStream expectedInputStream =
new ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
InputStream expectedInputStream =
new ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
final ProfilesTasksMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
servicesMatcher.verify(tasks);
break;
} catch (Throwable e) {
if (i == 10 - 1) {
throw new IllegalStateException("match profile task list fail!", e);
}
TimeUnit.SECONDS.sleep(retryInterval);
}
}
final ProfilesTasksMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
servicesMatcher.verify(tasks);
}
private void verifyServices(LocalDateTime minutesAgo) throws Exception {
......
......@@ -95,6 +95,7 @@
<INSTRUMENTED_SERVICE_1_OPTS>
-DSW_AGENT_COLLECTOR_BACKEND_SERVICES=127.0.0.1:11800
-DSW_AGENT_NAME=${provider.name}
-DSW_AGENT_PROFILE_ACTIVE=true
-Dserver.port=9090
</INSTRUMENTED_SERVICE_1_OPTS>
</env>
......
......@@ -126,6 +126,8 @@ receiver-clr:
default:
#receiver-so11y:
# default:
receiver-profile:
default:
service-mesh:
default:
bufferPath: ${SW_SERVICE_MESH_BUFFER_PATH:../mesh-buffer/} # Path to trace buffer files, suggest to use absolute path
......
......@@ -55,6 +55,7 @@ import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
......@@ -66,7 +67,7 @@ public class ProfileVerificationITCase {
private static final Logger LOGGER = LoggerFactory.getLogger(ProfileVerificationITCase.class);
private final RestTemplate restTemplate = new RestTemplate();
private final int retryInterval = 30;
private final int retryInterval = 10;
private ProfileClient profileClient;
private String instrumentedServiceUrl;
......@@ -74,10 +75,11 @@ public class ProfileVerificationITCase {
@Before
public void setUp() {
final String swWebappHost = System.getProperty("sw.webapp.host", "127.0.0.1");
// final String swWebappPort = System.getProperty("sw.webapp.port", "32783");
// final String swWebappPort = System.getProperty("sw.webapp.port", "32783");
final String swWebappPort = System.getProperty("sw.webapp.port", "12800");
final String instrumentedServiceHost = System.getProperty("service.host", "127.0.0.1");
final String instrumentedServicePort = System.getProperty("service.port", "32782");
// final String instrumentedServicePort = System.getProperty("service.port", "9090");
profileClient = new ProfileClient(swWebappHost, swWebappPort);
instrumentedServiceUrl = "http://" + instrumentedServiceHost + ":" + instrumentedServicePort;
}
......@@ -134,7 +136,8 @@ public class ProfileVerificationITCase {
.duration(5)
.startTime(-1)
.minDurationThreshold(10)
.dumpPeriod(10).build();
.dumpPeriod(10)
.maxSamplingCount(5).build();
// verify create task
final ProfileTaskCreationResult creationResult = profileClient.createProfileTask(creationRequest);
......@@ -143,19 +146,30 @@ public class ProfileVerificationITCase {
ProfileTaskCreationResultMatcher creationResultMatcher = new ProfileTaskCreationResultMatcher();
creationResultMatcher.verify(creationResult);
// verify get task list
final ProfileTasks tasks = profileClient.getProfileTaskList(
new ProfileTaskQuery()
.serviceId(creationRequest.getServiceId())
.endpointName("")
);
LOGGER.info("get profile task list: {}", tasks);
// verify get task list and logs
for (int i = 0; i < 10; i++) {
try {
final ProfileTasks tasks = profileClient.getProfileTaskList(
new ProfileTaskQuery()
.serviceId(creationRequest.getServiceId())
.endpointName("")
);
LOGGER.info("get profile task list: {}", tasks);
InputStream expectedInputStream =
new ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
InputStream expectedInputStream =
new ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
final ProfilesTasksMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
servicesMatcher.verify(tasks);
break;
} catch (Throwable e) {
if (i == 10 - 1) {
throw new IllegalStateException("match profile task list fail!", e);
}
TimeUnit.SECONDS.sleep(retryInterval);
}
}
final ProfilesTasksMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
servicesMatcher.verify(tasks);
}
private void verifyServices(LocalDateTime minutesAgo) throws Exception {
......
......@@ -59,7 +59,8 @@ public class ProfileClient extends SimpleQueryClient {
.replace("{duration}", String.valueOf(creationRequest.getDuration()))
.replace("{startTime}", String.valueOf(creationRequest.getStartTime()))
.replace("{minDurationThreshold}", String.valueOf(creationRequest.getMinDurationThreshold()))
.replace("{dumpPeriod}", String.valueOf(creationRequest.getDumpPeriod()));
.replace("{dumpPeriod}", String.valueOf(creationRequest.getDumpPeriod()))
.replace("{maxSamplingCount}", String.valueOf(creationRequest.getMaxSamplingCount()));
final ResponseEntity<GQLResponse<ProfileTaskCreationResultWrapper>> responseEntity = restTemplate.exchange(
new RequestEntity<>(queryString, HttpMethod.POST, URI.create(endpointUrl)),
new ParameterizedTypeReference<GQLResponse<ProfileTaskCreationResultWrapper>>() {
......
......@@ -38,5 +38,6 @@ public class ProfileTaskCreationRequest {
private int duration;
private int minDurationThreshold;
private int dumpPeriod;
private int maxSamplingCount;
}
......@@ -21,6 +21,8 @@ import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import java.util.List;
/**
* Profile task bean for e2e GraphQL test result
*
......@@ -38,5 +40,8 @@ public class ProfileTask {
private String duration;
private String minDurationThreshold;
private String dumpPeriod;
private String maxSamplingCount;
private List<ProfileTaskLog> logs;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.e2e.profile.query;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
/**
* @author MrPro
*/
@Setter
@Getter
@ToString
public class ProfileTaskLog {
private String id;
private String instanceId;
private String operationType;
private String operationTime;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.e2e.profile.query;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.e2e.verification.AbstractMatcher;
/**
* @author MrPro
*/
@Setter
@Getter
public class ProfileTaskLogMatcher extends AbstractMatcher<ProfileTaskLog> {
private String id;
private String instanceId;
private String operationType;
private String operationTime;
@Override
public void verify(ProfileTaskLog profileTaskLog) {
doVerify(id, profileTaskLog.getId());
doVerify(instanceId, profileTaskLog.getInstanceId());
doVerify(operationType, profileTaskLog.getOperationType());
doVerify(operationTime, profileTaskLog.getOperationTime());
}
}
......@@ -21,6 +21,9 @@ package org.apache.skywalking.e2e.profile.query;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.e2e.verification.AbstractMatcher;
import org.assertj.core.api.Assertions;
import java.util.List;
/**
* @author MrPro
......@@ -36,6 +39,9 @@ public class ProfileTaskMatcher extends AbstractMatcher<ProfileTask> {
private String duration;
private String minDurationThreshold;
private String dumpPeriod;
private String maxSamplingCount;
private List<ProfileTaskLogMatcher> logs;
@Override
public void verify(ProfileTask task) {
......@@ -46,6 +52,13 @@ public class ProfileTaskMatcher extends AbstractMatcher<ProfileTask> {
doVerify(duration, task.getDuration());
doVerify(minDurationThreshold, task.getMinDurationThreshold());
doVerify(dumpPeriod, task.getDumpPeriod());
// verify logs
Assertions.assertThat(task.getLogs()).hasSameSizeAs(this.logs);
int size = this.getLogs().size();
for (int i = 0; i < size; i++) {
this.getLogs().get(i).verify(task.getLogs().get(i));
}
}
}
......@@ -22,3 +22,9 @@ tasks:
duration: gt 0
minDurationThreshold: gt 0
dumpPeriod: gt 0
maxSamplingCount: gt 0
logs:
- id: not null
instanceId: gt 0
operationType: NOTIFIED
operationTime: gt 0
......@@ -24,6 +24,13 @@
duration: duration
minDurationThreshold: minDurationThreshold
dumpPeriod: dumpPeriod
maxSamplingCount: maxSamplingCount
logs {
id
instanceId
operationType
operationTime
}
}
}",
"variables": {
......
......@@ -28,7 +28,8 @@
"duration": {duration},
"startTime": {startTime},
"minDurationThreshold": {minDurationThreshold},
"dumpPeriod": {dumpPeriod}
"dumpPeriod": {dumpPeriod},
"maxSamplingCount": {maxSamplingCount}
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册