提交 7c4c914a 编写于 作者: P peng-yongsheng

Define analysis segment parser module that used for split segment.

上级 8e02787d
......@@ -28,6 +28,5 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>agent-grpc-define</artifactId>
<packaging>jar</packaging>
</project>
\ No newline at end of file
......@@ -50,5 +50,10 @@
<artifactId>collector-grpc-manager-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>segment-parser-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -27,6 +27,7 @@ import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.ServiceNa
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.TraceSegmentServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.naming.AgentGRPCNamingHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.naming.AgentGRPCNamingListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.AnalysisSegmentParserModule;
import org.apache.skywalking.apm.collector.cluster.ClusterModule;
import org.apache.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.apache.skywalking.apm.collector.cluster.service.ModuleRegisterService;
......@@ -85,7 +86,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, GRPCManagerModule.NAME};
return new String[] {ClusterModule.NAME, NamingModule.NAME, GRPCManagerModule.NAME, AnalysisSegmentParserModule.NAME};
}
private void addHandlers(Server gRPCServer) {
......
......@@ -16,12 +16,11 @@
*
*/
package org.apache.skywalking.apm.collector.agent.grpc.provider.handler;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.apache.skywalking.apm.collector.agent.stream.service.trace.ITraceSegmentService;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.AnalysisSegmentParserModule;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.apache.skywalking.apm.network.proto.Downstream;
......@@ -37,17 +36,17 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
private final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class);
private final ITraceSegmentService traceSegmentService;
private final ISegmentParseService segmentParseService;
public TraceSegmentServiceHandler(ModuleManager moduleManager) {
this.traceSegmentService = moduleManager.find(AgentStreamModule.NAME).getService(ITraceSegmentService.class);
this.segmentParseService = moduleManager.find(AnalysisSegmentParserModule.NAME).getService(ISegmentParseService.class);
}
@Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
return new StreamObserver<UpstreamSegment>() {
@Override public void onNext(UpstreamSegment segment) {
logger.debug("receive segment");
traceSegmentService.send(segment);
segmentParseService.parse(segment, ISegmentParseService.Source.Agent);
}
@Override public void onError(Throwable throwable) {
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-analysis</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>5.0.0-alpha</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>analysis-segment-parser</artifactId>
<packaging>pom</packaging>
<modules>
<module>segment-parser-define</module>
<module>segment-parser-provider</module>
</modules>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>analysis-segment-parser</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>5.0.0-alpha</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>segment-parser-define</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.define;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParserListenerRegister;
import org.apache.skywalking.apm.collector.core.module.Module;
/**
* @author peng-yongsheng
*/
public class AnalysisSegmentParserModule extends Module {
public static final String NAME = "analysis_segment_parser";
@Override public String name() {
return NAME;
}
@Override public Class[] services() {
return new Class[] {ISegmentParseService.class, ISegmentParserListenerRegister.class};
}
}
......@@ -17,7 +17,7 @@
*/
package org.apache.skywalking.apm.collector.agent.stream.parser.standardization;
package org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator;
import org.apache.skywalking.apm.network.proto.RefType;
import org.apache.skywalking.apm.network.proto.TraceSegmentReference;
......
......@@ -17,7 +17,7 @@
*/
package org.apache.skywalking.apm.collector.agent.stream.parser.standardization;
package org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator;
import org.apache.skywalking.apm.network.proto.TraceSegmentObject;
import org.apache.skywalking.apm.network.proto.UniqueId;
......
......@@ -17,7 +17,7 @@
*/
package org.apache.skywalking.apm.collector.agent.stream.parser.standardization;
package org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator;
import org.apache.skywalking.apm.network.proto.SpanLayer;
import org.apache.skywalking.apm.network.proto.SpanObject;
......
......@@ -16,14 +16,13 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener;
package org.apache.skywalking.apm.collector.agent.stream.parser;
import org.apache.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
/**
* @author peng-yongsheng
*/
public interface EntrySpanListener extends SpanListener {
void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId);
}
}
\ No newline at end of file
......@@ -16,10 +16,9 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener;
package org.apache.skywalking.apm.collector.agent.stream.parser;
import org.apache.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
/**
* @author peng-yongsheng
......
......@@ -16,10 +16,9 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener;
package org.apache.skywalking.apm.collector.agent.stream.parser;
import org.apache.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
/**
* @author peng-yongsheng
......
......@@ -16,10 +16,9 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener;
package org.apache.skywalking.apm.collector.agent.stream.parser;
import org.apache.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
/**
* @author peng-yongsheng
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.define.service;
import org.apache.skywalking.apm.collector.core.module.Service;
import org.apache.skywalking.apm.network.proto.UpstreamSegment;
/**
* @author peng-yongsheng
*/
public interface ISegmentParseService extends Service {
void parse(UpstreamSegment segment, Source source);
enum Source {
Agent, Buffer
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.define.service;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.SpanListener;
import org.apache.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface ISegmentParserListenerRegister extends Service {
void register(SpanListener spanListener);
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.apm.collector.analysis.segment.parser.define.AnalysisSegmentParserModule
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>analysis-segment-parser</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>5.0.0-alpha</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>segment-parser-provider</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>segment-parser-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider;
import java.util.Properties;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.AnalysisSegmentParserModule;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParserListenerRegister;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.SegmentParserListenerManager;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.service.SegmentParseService;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.service.SegmentParserListenerRegister;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
/**
* @author peng-yongsheng
*/
public class AnalysisTraceParseModuleProvider extends ModuleProvider {
public static final String NAME = "default";
private SegmentParserListenerManager listenerManager;
@Override public String name() {
return NAME;
}
@Override public Class<? extends Module> module() {
return AnalysisSegmentParserModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
this.listenerManager = new SegmentParserListenerManager();
this.registerServiceImplementation(ISegmentParserListenerRegister.class, new SegmentParserListenerRegister(listenerManager));
this.registerServiceImplementation(ISegmentParseService.class, new SegmentParseService(getManager(), listenerManager));
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
@Override public String[] requiredModules() {
return new String[0];
}
}
......@@ -16,33 +16,29 @@
*
*/
package org.apache.skywalking.apm.collector.agent.stream.parser;
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceIdExchanger;
import org.apache.skywalking.apm.collector.agent.stream.parser.standardization.SegmentDecorator;
import org.apache.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntrySpanListener;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.apache.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator;
import org.apache.skywalking.apm.collector.agent.stream.parser.standardization.SegmentStandardization;
import org.apache.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.apache.skywalking.apm.collector.agent.stream.parser.standardization.SpanIdExchanger;
import org.apache.skywalking.apm.collector.agent.stream.worker.trace.application.ApplicationComponentSpanListener;
import org.apache.skywalking.apm.collector.agent.stream.worker.trace.application.ApplicationMappingSpanListener;
import org.apache.skywalking.apm.collector.agent.stream.worker.trace.application.ApplicationReferenceMetricSpanListener;
import org.apache.skywalking.apm.collector.agent.stream.worker.trace.global.GlobalTraceSpanListener;
import org.apache.skywalking.apm.collector.agent.stream.worker.trace.instance.InstanceMetricSpanListener;
import org.apache.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostSpanListener;
import org.apache.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceReferenceMetricSpanListener;
import javax.swing.text.Segment;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.ReferenceDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SegmentDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.EntrySpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.ExitSpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.FirstSpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.GlobalTraceIdsListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.LocalSpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.SpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.ReferenceIdExchanger;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.SegmentStandardization;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.SpanIdExchanger;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.graph.Graph;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.table.segment.Segment;
import org.apache.skywalking.apm.network.proto.SpanType;
import org.apache.skywalking.apm.network.proto.TraceSegmentObject;
import org.apache.skywalking.apm.network.proto.UniqueId;
......@@ -57,25 +53,17 @@ public class SegmentParse {
private final Logger logger = LoggerFactory.getLogger(SegmentParse.class);
private final List<SpanListener> spanListeners;
private final ModuleManager moduleManager;
private final SegmentParserListenerManager listenerManager;
private String segmentId;
private long timeBucket = 0;
public SegmentParse(ModuleManager moduleManager) {
public SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
this.moduleManager = moduleManager;
this.spanListeners = new ArrayList<>();
this.spanListeners.add(new ApplicationComponentSpanListener());
this.spanListeners.add(new ApplicationMappingSpanListener());
this.spanListeners.add(new ApplicationReferenceMetricSpanListener(moduleManager));
this.spanListeners.add(new SegmentCostSpanListener(moduleManager));
this.spanListeners.add(new GlobalTraceSpanListener());
this.spanListeners.add(new ServiceEntrySpanListener(moduleManager));
this.spanListeners.add(new ServiceReferenceMetricSpanListener());
this.spanListeners.add(new InstanceMetricSpanListener());
this.listenerManager = listenerManager;
}
public boolean parse(UpstreamSegment segment, Source source) {
public boolean parse(UpstreamSegment segment, ISegmentParseService.Source source) {
try {
List<UniqueId> traceIds = segment.getGlobalTraceIdsList();
TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getSegment());
......@@ -85,7 +73,7 @@ public class SegmentParse {
if (!preBuild(traceIds, segmentDecorator)) {
logger.debug("This segment id exchange not success, write to buffer file, id: {}", segmentId);
if (source.equals(Source.Agent)) {
if (source.equals(ISegmentParseService.Source.Agent)) {
writeToBufferFile(segmentId, segment);
}
return false;
......@@ -184,12 +172,12 @@ public class SegmentParse {
}
private void notifyListenerToBuild() {
spanListeners.forEach(SpanListener::build);
listenerManager.getSpanListeners().forEach(SpanListener::build);
}
private void notifyExitListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
for (SpanListener listener : listenerManager.getSpanListeners()) {
if (listener instanceof ExitSpanListener) {
((ExitSpanListener)listener).parseExit(spanDecorator, applicationId, applicationInstanceId, segmentId);
}
......@@ -198,7 +186,7 @@ public class SegmentParse {
private void notifyEntryListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
for (SpanListener listener : listenerManager.getSpanListeners()) {
if (listener instanceof EntrySpanListener) {
((EntrySpanListener)listener).parseEntry(spanDecorator, applicationId, applicationInstanceId, segmentId);
}
......@@ -207,7 +195,7 @@ public class SegmentParse {
private void notifyLocalListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
for (SpanListener listener : listenerManager.getSpanListeners()) {
if (listener instanceof LocalSpanListener) {
((LocalSpanListener)listener).parseLocal(spanDecorator, applicationId, applicationInstanceId, segmentId);
}
......@@ -216,7 +204,7 @@ public class SegmentParse {
private void notifyFirstListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
for (SpanListener listener : listenerManager.getSpanListeners()) {
if (listener instanceof FirstSpanListener) {
((FirstSpanListener)listener).parseFirst(spanDecorator, applicationId, applicationInstanceId, segmentId);
}
......@@ -224,14 +212,10 @@ public class SegmentParse {
}
private void notifyGlobalsListener(UniqueId uniqueId) {
for (SpanListener listener : spanListeners) {
for (SpanListener listener : listenerManager.getSpanListeners()) {
if (listener instanceof GlobalTraceIdsListener) {
((GlobalTraceIdsListener)listener).parseGlobalTraceId(uniqueId);
}
}
}
public enum Source {
Agent, Buffer
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.SpanListener;
/**
* @author peng-yongsheng
*/
public class SegmentParserListenerManager {
private List<SpanListener> spanListeners;
public SegmentParserListenerManager() {
this.spanListeners = new ArrayList<>();
}
public void add(SpanListener spanListener) {
spanListeners.add(spanListener);
}
public List<SpanListener> getSpanListeners() {
return spanListeners;
}
}
......@@ -17,7 +17,9 @@
*/
package org.apache.skywalking.apm.collector.agent.stream.parser.standardization;
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.StandardBuilder;
/**
* @author peng-yongsheng
......
......@@ -17,8 +17,9 @@
*/
package org.apache.skywalking.apm.collector.agent.stream.parser.standardization;
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.ReferenceDecorator;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.apache.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
......
......@@ -17,7 +17,7 @@
*/
package org.apache.skywalking.apm.collector.agent.stream.parser.standardization;
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization;
import org.apache.skywalking.apm.collector.core.data.EndOfBatchQueueMessage;
import org.apache.skywalking.apm.network.proto.UpstreamSegment;
......
......@@ -16,11 +16,11 @@
*
*/
package org.apache.skywalking.apm.collector.agent.stream.parser.standardization;
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization;
import org.apache.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.apache.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.service;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.SegmentParse;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.SegmentParserListenerManager;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.network.proto.UpstreamSegment;
/**
* @author peng-yongsheng
*/
public class SegmentParseService implements ISegmentParseService {
private final ModuleManager moduleManager;
private final SegmentParserListenerManager listenerManager;
public SegmentParseService(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
}
@Override public void parse(UpstreamSegment segment, Source source) {
SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
segmentParse.parse(segment, source);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.service;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.SpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParserListenerRegister;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.SegmentParserListenerManager;
/**
* @author peng-yongsheng
*/
public class SegmentParserListenerRegister implements ISegmentParserListenerRegister {
private final SegmentParserListenerManager listenerManager;
public SegmentParserListenerRegister(SegmentParserListenerManager listenerManager) {
this.listenerManager = listenerManager;
}
@Override public void register(SpanListener spanListener) {
this.listenerManager.add(spanListener);
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.apm.collector.analysis.segment.parser.provider.AnalysisTraceParseModuleProvider
\ No newline at end of file
......@@ -28,6 +28,16 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-analysis</artifactId>
<packaging>pom</packaging>
<modules>
<module>analysis-segment-parser</module>
</modules>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册