未验证 提交 25214e91 编写于 作者: Z Zhenxu Ke 提交者: GitHub

Collect and report agent starting / shutdown events (#6559)

上级 3553b2ed
......@@ -17,6 +17,8 @@
name: "CodeQL"
on:
push:
branches: [ 'master' ]
pull_request:
branches: [ 'master' ]
schedule:
......
......@@ -21,6 +21,7 @@ Release Notes.
* Fix ClassCastException by making CallbackAdapterInterceptor to implement EnhancedInstance interface in the spring-kafka plugin.
* Fix NullPointerException with KafkaProducer.send(record).
* Support config `agent.span_limit_per_segment` can be changed in the runtime.
* Collect and report agent starting / shutdown events.
#### OAP-Backend
* Allow user-defined `JAVA_OPTS` in the startup script.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core;
import java.util.UUID;
import lombok.Getter;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.os.OSUtil;
import static org.apache.skywalking.apm.util.StringUtil.isEmpty;
@Getter
public class ServiceInstanceGenerator implements BootService {
@Override
public void prepare() throws Throwable {
if (!isEmpty(Config.Agent.INSTANCE_NAME)) {
return;
}
Config.Agent.INSTANCE_NAME = UUID.randomUUID().toString().replaceAll("-", "") + "@" + OSUtil.getIPV4();
}
@Override
public void boot() throws Throwable {
}
@Override
public void onComplete() throws Throwable {
}
@Override
public void shutdown() throws Throwable {
}
@Override
public int priority() {
return Integer.MAX_VALUE;
}
}
......@@ -30,4 +30,13 @@ public interface BootService {
void onComplete() throws Throwable;
void shutdown() throws Throwable;
/**
* {@code BootService}s with higher priorities will be started earlier, and shut down later than those {@code BootService}s with lower priorities.
*
* @return the priority of this {@code BootService}.
*/
default int priority() {
return 0;
}
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.apm.agent.core.boot;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
......@@ -46,13 +47,13 @@ public enum ServiceManager {
}
public void shutdown() {
for (BootService service : bootedServices.values()) {
bootedServices.values().stream().sorted(Comparator.comparingInt(BootService::priority).reversed()).forEach(service -> {
try {
service.shutdown();
} catch (Throwable e) {
LOGGER.error(e, "ServiceManager try to shutdown [{}] fail.", service.getClass().getName());
}
}
});
}
private Map<Class, BootService> loadAllServices() {
......@@ -99,23 +100,23 @@ public enum ServiceManager {
}
private void prepare() {
for (BootService service : bootedServices.values()) {
bootedServices.values().stream().sorted(Comparator.comparingInt(BootService::priority)).forEach(service -> {
try {
service.prepare();
} catch (Throwable e) {
LOGGER.error(e, "ServiceManager try to pre-start [{}] fail.", service.getClass().getName());
}
}
});
}
private void startup() {
for (BootService service : bootedServices.values()) {
bootedServices.values().stream().sorted(Comparator.comparingInt(BootService::priority)).forEach(service -> {
try {
service.boot();
} catch (Throwable e) {
LOGGER.error(e, "ServiceManager try to start [{}] fail.", service.getClass().getName());
}
}
});
}
private void onComplete() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.commands.CommandService;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.network.common.v3.Commands;
import org.apache.skywalking.apm.network.event.v3.Event;
import org.apache.skywalking.apm.network.event.v3.EventServiceGrpc;
import org.apache.skywalking.apm.network.event.v3.Source;
import org.apache.skywalking.apm.network.event.v3.Type;
import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
import static org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED;
@DefaultImplementor
public class EventReportServiceClient implements BootService, GRPCChannelListener {
private static final ILog LOGGER = LogManager.getLogger(EventReportServiceClient.class);
private final AtomicBoolean reported = new AtomicBoolean();
private Event.Builder startingEvent;
private EventServiceGrpc.EventServiceStub eventServiceStub;
private GRPCChannelStatus status;
@Override
public void prepare() throws Throwable {
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
final RuntimeMXBean runtimeMxBean = ManagementFactory.getRuntimeMXBean();
startingEvent = Event.newBuilder()
.setUuid(UUID.randomUUID().toString())
.setName("Start")
.setStartTime(runtimeMxBean.getStartTime())
.setMessage("Start Java Application")
.setType(Type.Normal)
.setSource(
Source.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.build()
)
.putParameters(
"OPTS",
runtimeMxBean.getInputArguments()
.stream()
.sorted()
.collect(Collectors.joining(" "))
);
}
@Override
public void boot() throws Throwable {
}
@Override
public void onComplete() throws Throwable {
startingEvent.setEndTime(System.currentTimeMillis());
reportStartingEvent();
}
@Override
public void shutdown() throws Throwable {
if (!CONNECTED.equals(status)) {
return;
}
final CountDownLatch latch = new CountDownLatch(1);
final Event.Builder shutdownEvent = Event.newBuilder()
.setUuid(UUID.randomUUID().toString())
.setName("Shutdown")
.setStartTime(System.currentTimeMillis())
.setEndTime(System.currentTimeMillis())
.setMessage("Shutting down Java Application")
.setType(Type.Normal)
.setSource(
Source.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.build()
);
final StreamObserver<Event> collector = eventServiceStub.collect(new StreamObserver<Commands>() {
@Override
public void onNext(final Commands commands) {
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
@Override
public void onError(final Throwable t) {
LOGGER.error("Failed to report shutdown event.", t);
// Ignore status change at shutting down stage.
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
});
collector.onNext(shutdownEvent.build());
collector.onCompleted();
latch.await();
}
@Override
public void statusChanged(final GRPCChannelStatus status) {
this.status = status;
if (!CONNECTED.equals(status)) {
return;
}
final Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
eventServiceStub = EventServiceGrpc.newStub(channel);
eventServiceStub = eventServiceStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS);
reportStartingEvent();
}
private void reportStartingEvent() {
if (reported.compareAndSet(false, true)) {
return;
}
final StreamObserver<Event> collector = eventServiceStub.collect(new StreamObserver<Commands>() {
@Override
public void onNext(final Commands commands) {
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
@Override
public void onError(final Throwable t) {
LOGGER.error("Failed to report starting event.", t);
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
reported.set(false);
}
@Override
public void onCompleted() {
}
});
collector.onNext(startingEvent.build());
collector.onCompleted();
}
}
......@@ -208,4 +208,9 @@ public class GRPCChannelManager implements BootService, Runnable {
}
return false;
}
@Override
public int priority() {
return Integer.MAX_VALUE;
}
}
......@@ -21,7 +21,6 @@ package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.Channel;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
......@@ -41,7 +40,6 @@ import org.apache.skywalking.apm.network.management.v3.InstancePingPkg;
import org.apache.skywalking.apm.network.management.v3.InstanceProperties;
import org.apache.skywalking.apm.network.management.v3.ManagementServiceGrpc;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.apm.util.StringUtil;
import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
......@@ -78,10 +76,6 @@ public class ServiceManagementClient implements BootService, Runnable, GRPCChann
.setValue(Config.Agent.INSTANCE_PROPERTIES.get(key))
.build());
}
Config.Agent.INSTANCE_NAME = StringUtil.isEmpty(Config.Agent.INSTANCE_NAME)
? UUID.randomUUID().toString().replaceAll("-", "") + "@" + OSUtil.getIPV4()
: Config.Agent.INSTANCE_NAME;
}
@Override
......
......@@ -33,4 +33,6 @@ org.apache.skywalking.apm.agent.core.meter.MeterService
org.apache.skywalking.apm.agent.core.meter.MeterSender
org.apache.skywalking.apm.agent.core.context.status.StatusCheckService
org.apache.skywalking.apm.agent.core.remote.LogReportServiceClient
org.apache.skywalking.apm.agent.core.conf.dynamic.ConfigurationDiscoveryService
\ No newline at end of file
org.apache.skywalking.apm.agent.core.conf.dynamic.ConfigurationDiscoveryService
org.apache.skywalking.apm.agent.core.remote.EventReportServiceClient
org.apache.skywalking.apm.agent.core.ServiceInstanceGenerator
......@@ -58,7 +58,7 @@ public class ServiceManagerTest {
public void testServiceDependencies() throws Exception {
HashMap<Class, BootService> registryService = getFieldValue(ServiceManager.INSTANCE, "bootedServices");
assertThat(registryService.size(), is(18));
assertThat(registryService.size(), is(20));
assertTraceSegmentServiceClient(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class));
assertContextManager(ServiceManager.INSTANCE.findService(ContextManager.class));
......@@ -107,7 +107,7 @@ public class ServiceManagerTest {
assertNotNull(service);
List<GRPCChannelListener> listeners = getFieldValue(service, "listeners");
assertEquals(listeners.size(), 8);
assertEquals(listeners.size(), 9);
}
private void assertSamplingService(SamplingService service) {
......
......@@ -95,7 +95,7 @@ public class Event extends Metrics {
@Column(columnName = MESSAGE)
private String message;
@Column(columnName = PARAMETERS, storageOnly = true)
@Column(columnName = PARAMETERS, storageOnly = true, length = 1024)
private String parameters;
@Column(columnName = START_TIME)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册