From 71d27b1840146a36174f7186ee336da630b0ce5a Mon Sep 17 00:00:00 2001 From: peng-yongsheng <8082209@qq.com> Date: Sun, 12 Nov 2017 10:24:34 +0800 Subject: [PATCH] Segment standardization --- .../agent/stream/buffer/BufferFileConfig.java | 27 +++ .../collector/agent/stream/buffer/Offset.java | 99 ++++++++ .../agent/stream/buffer/OffsetManager.java | 147 ++++++++++++ .../stream/buffer/SegmentBufferConfig.java | 28 +++ .../stream/buffer/SegmentBufferManager.java | 102 ++++++++ .../stream/buffer/SegmentBufferReader.java | 147 ++++++++++++ .../stream/parser/EntrySpanListener.java | 28 +++ .../agent/stream/parser/ExitSpanListener.java | 28 +++ .../stream/parser/FirstSpanListener.java | 28 +++ .../stream/parser/GlobalTraceIdsListener.java | 28 +++ .../stream/parser/LocalSpanListener.java | 28 +++ .../agent/stream/parser/RefsListener.java | 28 +++ .../agent/stream/parser/SegmentParse.java | 219 ++++++++++++++++++ .../agent/stream/parser/SpanListener.java | 26 +++ .../parser/standardization/IdExchanger.java | 26 +++ .../standardization/ReferenceDecorator.java | 190 +++++++++++++++ .../standardization/ReferenceIdExchanger.java | 86 +++++++ .../standardization/SegmentDecorator.java | 86 +++++++ .../SegmentStandardizationWorker.java | 72 ++++++ .../parser/standardization/SpanDecorator.java | 200 ++++++++++++++++ .../standardization/SpanIdExchanger.java | 71 ++++++ .../standardization/StandardBuilder.java | 26 +++ .../agent/stream/util/FileUtils.java | 89 +++++++ .../worker/register/ServiceNameService.java | 50 ++++ .../trace/global/GlobalTraceSpanListener.java | 73 ++++++ .../instance/InstPerformanceSpanListener.java | 64 +++++ .../trace/node/NodeComponentSpanListener.java | 97 ++++++++ .../trace/node/NodeMappingSpanListener.java | 68 ++++++ .../noderef/NodeReferenceSpanListener.java | 127 ++++++++++ .../segment/SegmentCostSpanListener.java | 98 ++++++++ .../service/ServiceEntrySpanListener.java | 85 +++++++ .../ServiceReferenceSpanListener.java | 132 +++++++++++ .../collector/core/config/SystemConfig.java | 26 +++ 33 files changed, 2629 insertions(+) create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/BufferFileConfig.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/Offset.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/OffsetManager.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferConfig.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferManager.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferReader.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/EntrySpanListener.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/ExitSpanListener.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/FirstSpanListener.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/GlobalTraceIdsListener.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/LocalSpanListener.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/RefsListener.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SpanListener.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/IdExchanger.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/ReferenceDecorator.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/ReferenceIdExchanger.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentDecorator.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SpanDecorator.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SpanIdExchanger.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/StandardBuilder.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/util/FileUtils.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/global/GlobalTraceSpanListener.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstPerformanceSpanListener.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeComponentSpanListener.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeMappingSpanListener.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/noderef/NodeReferenceSpanListener.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/segment/SegmentCostSpanListener.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceEntrySpanListener.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/serviceref/ServiceReferenceSpanListener.java create mode 100644 apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/config/SystemConfig.java diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/BufferFileConfig.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/BufferFileConfig.java new file mode 100644 index 0000000000..6cc6f77241 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/BufferFileConfig.java @@ -0,0 +1,27 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.buffer; + +/** + * @author peng-yongsheng + */ +public class BufferFileConfig { + public static int BUFFER_OFFSET_MAX_FILE_SIZE = 10 * 1024 * 1024; + public static int BUFFER_SEGMENT_MAX_FILE_SIZE = 10 * 1024 * 1024; +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/Offset.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/Offset.java new file mode 100644 index 0000000000..d2eba51edb --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/Offset.java @@ -0,0 +1,99 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.buffer; + +/** + * @author peng-yongsheng + */ +public class Offset { + + private static final String SPLIT_CHARACTER = ","; + private ReadOffset readOffset; + private WriteOffset writeOffset; + + public Offset() { + readOffset = new ReadOffset(); + writeOffset = new WriteOffset(); + } + + public String serialize() { + return readOffset.getReadFileName() + SPLIT_CHARACTER + String.valueOf(readOffset.getReadFileOffset()) + + SPLIT_CHARACTER + writeOffset.getWriteFileName() + SPLIT_CHARACTER + String.valueOf(writeOffset.getWriteFileOffset()); + } + + public void deserialize(String value) { + String[] values = value.split(SPLIT_CHARACTER); + if (values.length == 4) { + this.readOffset.readFileName = values[0]; + this.readOffset.readFileOffset = Long.parseLong(values[1]); + this.writeOffset.writeFileName = values[2]; + this.writeOffset.writeFileOffset = Long.parseLong(values[3]); + } + } + + public ReadOffset getReadOffset() { + return readOffset; + } + + public WriteOffset getWriteOffset() { + return writeOffset; + } + + public static class ReadOffset { + private String readFileName; + private long readFileOffset = 0; + + public String getReadFileName() { + return readFileName; + } + + public long getReadFileOffset() { + return readFileOffset; + } + + public void setReadFileName(String readFileName) { + this.readFileName = readFileName; + } + + public void setReadFileOffset(long readFileOffset) { + this.readFileOffset = readFileOffset; + } + } + + public static class WriteOffset { + private String writeFileName; + private long writeFileOffset = 0; + + public String getWriteFileName() { + return writeFileName; + } + + public long getWriteFileOffset() { + return writeFileOffset; + } + + public void setWriteFileName(String writeFileName) { + this.writeFileName = writeFileName; + } + + public void setWriteFileOffset(long writeFileOffset) { + this.writeFileOffset = writeFileOffset; + } + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/OffsetManager.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/OffsetManager.java new file mode 100644 index 0000000000..68bce15819 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/OffsetManager.java @@ -0,0 +1,147 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.buffer; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.skywalking.apm.collector.agent.stream.util.FileUtils; +import org.skywalking.apm.collector.core.util.CollectionUtils; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public enum OffsetManager { + INSTANCE; + + private final Logger logger = LoggerFactory.getLogger(OffsetManager.class); + + private static final String OFFSET_FILE_PREFIX = "offset"; + private File offsetFile; + private Offset offset; + private boolean initialized = false; + private RandomAccessFile randomAccessFile = null; + private String lastOffsetRecord = Const.EMPTY_STRING; + + public synchronized void initialize() throws IOException { + if (!initialized) { + this.offset = new Offset(); + File dataPath = new File(SegmentBufferConfig.BUFFER_PATH); + if (dataPath.mkdirs()) { + createOffsetFile(); + } else { + File[] offsetFiles = dataPath.listFiles(new PrefixFileNameFilter()); + if (CollectionUtils.isNotEmpty(offsetFiles) && offsetFiles.length > 0) { + for (int i = 0; i < offsetFiles.length; i++) { + if (i != offsetFiles.length - 1) { + offsetFiles[i].delete(); + } else { + offsetFile = offsetFiles[i]; + } + } + } else { + createOffsetFile(); + } + } + String offsetRecord = FileUtils.INSTANCE.readLastLine(offsetFile); + offset.deserialize(offsetRecord); + initialized = true; + + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> flush(), 10, 3, TimeUnit.SECONDS); + } + } + + private void createOffsetFile() throws IOException { + String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis())); + String offsetFileName = OFFSET_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX; + offsetFile = new File(SegmentBufferConfig.BUFFER_PATH + offsetFileName); + this.offset.getWriteOffset().setWriteFileName(Const.EMPTY_STRING); + this.offset.getWriteOffset().setWriteFileOffset(0); + this.offset.getReadOffset().setReadFileName(Const.EMPTY_STRING); + this.offset.getReadOffset().setReadFileOffset(0); + this.flush(); + } + + public void flush() { + String offsetRecord = offset.serialize(); + if (!lastOffsetRecord.equals(offsetRecord)) { + if (offsetFile.length() >= BufferFileConfig.BUFFER_OFFSET_MAX_FILE_SIZE) { + nextFile(); + } + FileUtils.INSTANCE.writeAppendToLast(offsetFile, randomAccessFile, offsetRecord); + lastOffsetRecord = offsetRecord; + } + } + + private void nextFile() { + String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis())); + String offsetFileName = OFFSET_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX; + File newOffsetFile = new File(SegmentBufferConfig.BUFFER_PATH + offsetFileName); + offsetFile.delete(); + offsetFile = newOffsetFile; + this.flush(); + } + + public String getReadFileName() { + return offset.getReadOffset().getReadFileName(); + } + + public long getReadFileOffset() { + return offset.getReadOffset().getReadFileOffset(); + } + + public void setReadOffset(long readFileOffset) { + offset.getReadOffset().setReadFileOffset(readFileOffset); + } + + public void setReadOffset(String readFileName, long readFileOffset) { + offset.getReadOffset().setReadFileName(readFileName); + offset.getReadOffset().setReadFileOffset(readFileOffset); + } + + public String getWriteFileName() { + return offset.getWriteOffset().getWriteFileName(); + } + + public long getWriteFileOffset() { + return offset.getWriteOffset().getWriteFileOffset(); + } + + public void setWriteOffset(String writeFileName, long writeFileOffset) { + offset.getWriteOffset().setWriteFileName(writeFileName); + offset.getWriteOffset().setWriteFileOffset(writeFileOffset); + } + + public void setWriteOffset(long writeFileOffset) { + offset.getWriteOffset().setWriteFileOffset(writeFileOffset); + } + + class PrefixFileNameFilter implements FilenameFilter { + @Override public boolean accept(File dir, String name) { + return name.startsWith(OFFSET_FILE_PREFIX); + } + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferConfig.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferConfig.java new file mode 100644 index 0000000000..b27b7290f4 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferConfig.java @@ -0,0 +1,28 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.buffer; + +import org.skywalking.apm.collector.core.config.SystemConfig; + +/** + * @author peng-yongsheng + */ +public class SegmentBufferConfig { + public static String BUFFER_PATH = SystemConfig.DATA_PATH + "/buffer/"; +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferManager.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferManager.java new file mode 100644 index 0000000000..938d0e9803 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferManager.java @@ -0,0 +1,102 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.buffer; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.core.util.StringUtils; +import org.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.skywalking.apm.network.proto.UpstreamSegment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public enum SegmentBufferManager { + INSTANCE; + + private final Logger logger = LoggerFactory.getLogger(SegmentBufferManager.class); + + public static final String DATA_FILE_PREFIX = "data"; + private FileOutputStream outputStream; + + public synchronized void initialize() { + logger.info("segment buffer initialize"); + try { + OffsetManager.INSTANCE.initialize(); + if (new File(SegmentBufferConfig.BUFFER_PATH).mkdirs()) { + newDataFile(); + } else { + String writeFileName = OffsetManager.INSTANCE.getWriteFileName(); + if (StringUtils.isNotEmpty(writeFileName)) { + File dataFile = new File(SegmentBufferConfig.BUFFER_PATH + writeFileName); + if (dataFile.exists()) { + outputStream = new FileOutputStream(new File(SegmentBufferConfig.BUFFER_PATH + writeFileName), true); + } else { + newDataFile(); + } + } else { + newDataFile(); + } + } + SegmentBufferReader.INSTANCE.initialize(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + + public synchronized void writeBuffer(UpstreamSegment segment) { + try { + segment.writeDelimitedTo(outputStream); + long position = outputStream.getChannel().position(); + if (position > BufferFileConfig.BUFFER_SEGMENT_MAX_FILE_SIZE) { + newDataFile(); + } else { + OffsetManager.INSTANCE.setWriteOffset(position); + } + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + + private void newDataFile() throws IOException { + logger.debug("create new segment buffer file"); + String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis())); + String writeFileName = DATA_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX; + File dataFile = new File(SegmentBufferConfig.BUFFER_PATH + writeFileName); + dataFile.createNewFile(); + OffsetManager.INSTANCE.setWriteOffset(writeFileName, 0); + try { + if (outputStream != null) { + outputStream.close(); + } + outputStream = new FileOutputStream(dataFile); + outputStream.getChannel().position(0); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + + public synchronized void flush() { + + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferReader.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferReader.java new file mode 100644 index 0000000000..a073b212b7 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferReader.java @@ -0,0 +1,147 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.buffer; + +import com.google.protobuf.CodedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.skywalking.apm.collector.agent.stream.parser.SegmentParse; +import org.skywalking.apm.collector.core.util.CollectionUtils; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.core.util.StringUtils; +import org.skywalking.apm.network.proto.UpstreamSegment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public enum SegmentBufferReader { + INSTANCE; + + private final Logger logger = LoggerFactory.getLogger(SegmentBufferReader.class); + private InputStream inputStream; + + public void initialize() { + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::preRead, 3, 3, TimeUnit.SECONDS); + } + + private void preRead() { + String readFileName = OffsetManager.INSTANCE.getReadFileName(); + if (StringUtils.isNotEmpty(readFileName)) { + File readFile = new File(SegmentBufferConfig.BUFFER_PATH + readFileName); + if (readFile.exists()) { + deleteTheDataFilesBeforeReadFile(readFileName); + long readFileOffset = OffsetManager.INSTANCE.getReadFileOffset(); + read(readFile, readFileOffset); + readEarliestCreateDataFile(); + } else { + deleteTheDataFilesBeforeReadFile(readFileName); + readEarliestCreateDataFile(); + } + } else { + readEarliestCreateDataFile(); + } + } + + private void deleteTheDataFilesBeforeReadFile(String readFileName) { + File[] dataFiles = new File(SegmentBufferConfig.BUFFER_PATH).listFiles(new PrefixFileNameFilter()); + + long readFileCreateTime = getFileCreateTime(readFileName); + for (File dataFile : dataFiles) { + long fileCreateTime = getFileCreateTime(dataFile.getName()); + if (fileCreateTime < readFileCreateTime) { + dataFile.delete(); + } else if (fileCreateTime == readFileCreateTime) { + break; + } + } + } + + private long getFileCreateTime(String fileName) { + fileName = fileName.replace(SegmentBufferManager.DATA_FILE_PREFIX + "_", Const.EMPTY_STRING); + fileName = fileName.replace("." + Const.FILE_SUFFIX, Const.EMPTY_STRING); + return Long.valueOf(fileName); + } + + private void readEarliestCreateDataFile() { + String readFileName = OffsetManager.INSTANCE.getReadFileName(); + File[] dataFiles = new File(SegmentBufferConfig.BUFFER_PATH).listFiles(new PrefixFileNameFilter()); + + if (CollectionUtils.isNotEmpty(dataFiles)) { + if (dataFiles[0].getName().equals(readFileName)) { + return; + } + } + + for (File dataFile : dataFiles) { + logger.debug("Reading segment buffer data file, file name: {}", dataFile.getAbsolutePath()); + OffsetManager.INSTANCE.setReadOffset(dataFile.getName(), 0); + if (!read(dataFile, 0)) { + break; + } + } + } + + private boolean read(File readFile, long readFileOffset) { + try { + inputStream = new FileInputStream(readFile); + inputStream.skip(readFileOffset); + + String writeFileName = OffsetManager.INSTANCE.getWriteFileName(); + long endPoint = readFile.length(); + if (writeFileName.equals(readFile.getName())) { + endPoint = OffsetManager.INSTANCE.getWriteFileOffset(); + } + + while (readFile.length() > readFileOffset && readFileOffset < endPoint) { + UpstreamSegment upstreamSegment = UpstreamSegment.parser().parseDelimitedFrom(inputStream); + SegmentParse parse = new SegmentParse(null); + if (!parse.parse(upstreamSegment, SegmentParse.Source.Buffer)) { + return false; + } + + final int serialized = upstreamSegment.getSerializedSize(); + readFileOffset = readFileOffset + CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized; + logger.debug("read segment buffer from file: {}, offset: {}, file length: {}", readFile.getName(), readFileOffset, readFile.length()); + OffsetManager.INSTANCE.setReadOffset(readFileOffset); + } + + inputStream.close(); + if (!writeFileName.equals(readFile.getName())) { + readFile.delete(); + } + } catch (IOException e) { + logger.error(e.getMessage(), e); + return false; + } + return true; + } + + class PrefixFileNameFilter implements FilenameFilter { + @Override public boolean accept(File dir, String name) { + return name.startsWith(SegmentBufferManager.DATA_FILE_PREFIX); + } + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/EntrySpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/EntrySpanListener.java new file mode 100644 index 0000000000..35aac10014 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/EntrySpanListener.java @@ -0,0 +1,28 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.parser; + +import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; + +/** + * @author peng-yongsheng + */ +public interface EntrySpanListener extends SpanListener { + void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId); +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/ExitSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/ExitSpanListener.java new file mode 100644 index 0000000000..23c939fec9 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/ExitSpanListener.java @@ -0,0 +1,28 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.parser; + +import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; + +/** + * @author peng-yongsheng + */ +public interface ExitSpanListener extends SpanListener { + void parseExit(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId); +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/FirstSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/FirstSpanListener.java new file mode 100644 index 0000000000..ea774a98b8 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/FirstSpanListener.java @@ -0,0 +1,28 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.parser; + +import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; + +/** + * @author peng-yongsheng + */ +public interface FirstSpanListener extends SpanListener { + void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId); +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/GlobalTraceIdsListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/GlobalTraceIdsListener.java new file mode 100644 index 0000000000..25f7837ff0 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/GlobalTraceIdsListener.java @@ -0,0 +1,28 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.parser; + +import org.skywalking.apm.network.proto.UniqueId; + +/** + * @author peng-yongsheng + */ +public interface GlobalTraceIdsListener extends SpanListener { + void parseGlobalTraceId(UniqueId uniqueId); +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/LocalSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/LocalSpanListener.java new file mode 100644 index 0000000000..07fe5eeb1a --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/LocalSpanListener.java @@ -0,0 +1,28 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.parser; + +import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; + +/** + * @author peng-yongsheng + */ +public interface LocalSpanListener extends SpanListener { + void parseLocal(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId); +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/RefsListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/RefsListener.java new file mode 100644 index 0000000000..d12db455f6 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/RefsListener.java @@ -0,0 +1,28 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.parser; + +import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; + +/** + * @author peng-yongsheng + */ +public interface RefsListener extends SpanListener { + void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int instanceId, String segmentId); +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java new file mode 100644 index 0000000000..62ccc94706 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java @@ -0,0 +1,219 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.parser; + +import com.google.protobuf.InvalidProtocolBufferException; +import java.util.ArrayList; +import java.util.List; +import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; +import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceIdExchanger; +import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentDecorator; +import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; +import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanIdExchanger; +import org.skywalking.apm.collector.agent.stream.worker.trace.global.GlobalTraceSpanListener; +import org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstPerformanceSpanListener; +import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeComponentSpanListener; +import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingSpanListener; +import org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferenceSpanListener; +import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostSpanListener; +import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntrySpanListener; +import org.skywalking.apm.collector.agent.stream.worker.trace.serviceref.ServiceReferenceSpanListener; +import org.skywalking.apm.collector.cache.CacheServiceManager; +import org.skywalking.apm.collector.storage.table.segment.Segment; +import org.skywalking.apm.network.proto.SpanType; +import org.skywalking.apm.network.proto.TraceSegmentObject; +import org.skywalking.apm.network.proto.UniqueId; +import org.skywalking.apm.network.proto.UpstreamSegment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class SegmentParse { + + private final Logger logger = LoggerFactory.getLogger(SegmentParse.class); + + private final List spanListeners; + private final CacheServiceManager cacheServiceManager; + private String segmentId; + + public SegmentParse(CacheServiceManager cacheServiceManager) { + this.cacheServiceManager = cacheServiceManager; + this.spanListeners = new ArrayList<>(); + this.spanListeners.add(new NodeComponentSpanListener()); + this.spanListeners.add(new NodeMappingSpanListener()); + this.spanListeners.add(new NodeReferenceSpanListener(cacheServiceManager)); + this.spanListeners.add(new SegmentCostSpanListener(cacheServiceManager)); + this.spanListeners.add(new GlobalTraceSpanListener()); + this.spanListeners.add(new ServiceEntrySpanListener(cacheServiceManager)); + this.spanListeners.add(new ServiceReferenceSpanListener()); + this.spanListeners.add(new InstPerformanceSpanListener()); + } + + public boolean parse(UpstreamSegment segment, Source source) { + try { + List traceIds = segment.getGlobalTraceIdsList(); + TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getSegment()); + + SegmentDecorator segmentDecorator = new SegmentDecorator(segmentObject); + if (!preBuild(traceIds, segmentDecorator)) { + logger.debug("This segment id exchange not success, write to buffer file, id: {}", segmentId); + + if (source.equals(Source.Agent)) { + writeToBufferFile(segmentId, segment); + } + return false; + } else { + logger.debug("This segment id exchange success, id: {}", segmentId); + notifyListenerToBuild(); + buildSegment(segmentId, segmentDecorator.toByteArray()); + return true; + } + } catch (InvalidProtocolBufferException e) { + logger.error(e.getMessage(), e); + } + return false; + } + + private boolean preBuild(List traceIds, SegmentDecorator segmentDecorator) { + StringBuilder segmentIdBuilder = new StringBuilder(); + + for (int i = 0; i < segmentDecorator.getTraceSegmentId().getIdPartsList().size(); i++) { + if (i == 0) { + segmentIdBuilder.append(segmentDecorator.getTraceSegmentId().getIdPartsList().get(i)); + } else { + segmentIdBuilder.append(".").append(segmentDecorator.getTraceSegmentId().getIdPartsList().get(i)); + } + } + + segmentId = segmentIdBuilder.toString(); + + for (UniqueId uniqueId : traceIds) { + notifyGlobalsListener(uniqueId); + } + + int applicationId = segmentDecorator.getApplicationId(); + int applicationInstanceId = segmentDecorator.getApplicationInstanceId(); + + for (int i = 0; i < segmentDecorator.getRefsCount(); i++) { + ReferenceDecorator referenceDecorator = segmentDecorator.getRefs(i); + if (!ReferenceIdExchanger.getInstance(cacheServiceManager).exchange(referenceDecorator, applicationId)) { + return false; + } + + notifyRefsListener(referenceDecorator, applicationId, applicationInstanceId, segmentId); + } + + for (int i = 0; i < segmentDecorator.getSpansCount(); i++) { + SpanDecorator spanDecorator = segmentDecorator.getSpans(i); + + if (!SpanIdExchanger.getInstance(cacheServiceManager).exchange(spanDecorator, applicationId)) { + return false; + } + + if (spanDecorator.getSpanId() == 0) { + notifyFirstListener(spanDecorator, applicationId, applicationInstanceId, segmentId); + } + + if (SpanType.Exit.equals(spanDecorator.getSpanType())) { + notifyExitListener(spanDecorator, applicationId, applicationInstanceId, segmentId); + } else if (SpanType.Entry.equals(spanDecorator.getSpanType())) { + notifyEntryListener(spanDecorator, applicationId, applicationInstanceId, segmentId); + } else if (SpanType.Local.equals(spanDecorator.getSpanType())) { + notifyLocalListener(spanDecorator, applicationId, applicationInstanceId, segmentId); + } else { + logger.error("span type value was unexpected, span type name: {}", spanDecorator.getSpanType().name()); + } + } + + return true; + } + + private void buildSegment(String id, byte[] dataBinary) { + Segment segment = new Segment(id); + segment.setDataBinary(dataBinary); + } + + private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) { + logger.debug("send to segment buffer write worker, id: {}", id); +// context.getClusterWorkerContext().lookup(SegmentStandardizationWorker.WorkerRole.INSTANCE).tell(upstreamSegment); + } + + private void notifyListenerToBuild() { + spanListeners.forEach(SpanListener::build); + } + + private void notifyExitListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId, + String segmentId) { + for (SpanListener listener : spanListeners) { + if (listener instanceof ExitSpanListener) { + ((ExitSpanListener)listener).parseExit(spanDecorator, applicationId, applicationInstanceId, segmentId); + } + } + } + + private void notifyEntryListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId, + String segmentId) { + for (SpanListener listener : spanListeners) { + if (listener instanceof EntrySpanListener) { + ((EntrySpanListener)listener).parseEntry(spanDecorator, applicationId, applicationInstanceId, segmentId); + } + } + } + + private void notifyLocalListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId, + String segmentId) { + for (SpanListener listener : spanListeners) { + if (listener instanceof LocalSpanListener) { + ((LocalSpanListener)listener).parseLocal(spanDecorator, applicationId, applicationInstanceId, segmentId); + } + } + } + + private void notifyFirstListener(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId, + String segmentId) { + for (SpanListener listener : spanListeners) { + if (listener instanceof FirstSpanListener) { + ((FirstSpanListener)listener).parseFirst(spanDecorator, applicationId, applicationInstanceId, segmentId); + } + } + } + + private void notifyRefsListener(ReferenceDecorator reference, int applicationId, int applicationInstanceId, + String segmentId) { + for (SpanListener listener : spanListeners) { + if (listener instanceof RefsListener) { + ((RefsListener)listener).parseRef(reference, applicationId, applicationInstanceId, segmentId); + } + } + } + + private void notifyGlobalsListener(UniqueId uniqueId) { + for (SpanListener listener : spanListeners) { + if (listener instanceof GlobalTraceIdsListener) { + ((GlobalTraceIdsListener)listener).parseGlobalTraceId(uniqueId); + } + } + } + + public enum Source { + Agent, Buffer + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SpanListener.java new file mode 100644 index 0000000000..e2885e9908 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SpanListener.java @@ -0,0 +1,26 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.parser; + +/** + * @author peng-yongsheng + */ +public interface SpanListener { + void build(); +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/IdExchanger.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/IdExchanger.java new file mode 100644 index 0000000000..5b36854fc3 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/IdExchanger.java @@ -0,0 +1,26 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.parser.standardization; + +/** + * @author peng-yongsheng + */ +public interface IdExchanger { + boolean exchange(T standardBuilder, int applicationId); +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/ReferenceDecorator.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/ReferenceDecorator.java new file mode 100644 index 0000000000..796dd6d552 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/ReferenceDecorator.java @@ -0,0 +1,190 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.parser.standardization; + +import org.skywalking.apm.network.proto.RefType; +import org.skywalking.apm.network.proto.TraceSegmentReference; +import org.skywalking.apm.network.proto.UniqueId; + +/** + * @author peng-yongsheng + */ +public class ReferenceDecorator implements StandardBuilder { + private boolean isOrigin = true; + private StandardBuilder standardBuilder; + private TraceSegmentReference referenceObject; + private TraceSegmentReference.Builder referenceBuilder; + + public ReferenceDecorator(TraceSegmentReference referenceObject, StandardBuilder standardBuilder) { + this.referenceObject = referenceObject; + this.standardBuilder = standardBuilder; + } + + public ReferenceDecorator(TraceSegmentReference.Builder referenceBuilder, StandardBuilder standardBuilder) { + this.referenceBuilder = referenceBuilder; + this.standardBuilder = standardBuilder; + this.isOrigin = false; + } + + public RefType getRefType() { + if (isOrigin) { + return referenceObject.getRefType(); + } else { + return referenceBuilder.getRefType(); + } + } + + public int getRefTypeValue() { + if (isOrigin) { + return referenceObject.getRefTypeValue(); + } else { + return referenceBuilder.getRefTypeValue(); + } + } + + public int getEntryServiceId() { + if (isOrigin) { + return referenceObject.getEntryServiceId(); + } else { + return referenceBuilder.getEntryServiceId(); + } + } + + public void setEntryServiceId(int value) { + if (isOrigin) { + toBuilder(); + } + referenceBuilder.setEntryServiceId(value); + } + + public String getEntryServiceName() { + if (isOrigin) { + return referenceObject.getEntryServiceName(); + } else { + return referenceBuilder.getEntryServiceName(); + } + } + + public void setEntryServiceName(String value) { + if (isOrigin) { + toBuilder(); + } + referenceBuilder.setEntryServiceName(value); + } + + public int getEntryApplicationInstanceId() { + if (isOrigin) { + return referenceObject.getEntryApplicationInstanceId(); + } else { + return referenceBuilder.getEntryApplicationInstanceId(); + } + } + + public int getParentApplicationInstanceId() { + if (isOrigin) { + return referenceObject.getParentApplicationInstanceId(); + } else { + return referenceBuilder.getParentApplicationInstanceId(); + } + } + + public int getParentServiceId() { + if (isOrigin) { + return referenceObject.getParentServiceId(); + } else { + return referenceBuilder.getParentServiceId(); + } + } + + public void setParentServiceId(int value) { + if (isOrigin) { + toBuilder(); + } + referenceBuilder.setParentServiceId(value); + } + + public int getParentSpanId() { + if (isOrigin) { + return referenceObject.getParentSpanId(); + } else { + return referenceBuilder.getParentSpanId(); + } + } + + public String getParentServiceName() { + if (isOrigin) { + return referenceObject.getParentServiceName(); + } else { + return referenceBuilder.getParentServiceName(); + } + } + + public void setParentServiceName(String value) { + if (isOrigin) { + toBuilder(); + } + referenceBuilder.setParentServiceName(value); + } + + public UniqueId getParentTraceSegmentId() { + if (isOrigin) { + return referenceObject.getParentTraceSegmentId(); + } else { + return referenceBuilder.getParentTraceSegmentId(); + } + } + + public int getNetworkAddressId() { + if (isOrigin) { + return referenceObject.getNetworkAddressId(); + } else { + return referenceBuilder.getNetworkAddressId(); + } + } + + public void setNetworkAddressId(int value) { + if (isOrigin) { + toBuilder(); + } + referenceBuilder.setNetworkAddressId(value); + } + + public String getNetworkAddress() { + if (isOrigin) { + return referenceObject.getNetworkAddress(); + } else { + return referenceBuilder.getNetworkAddress(); + } + } + + public void setNetworkAddress(String value) { + if (isOrigin) { + toBuilder(); + } + referenceBuilder.setNetworkAddress(value); + } + + @Override public void toBuilder() { + if (this.isOrigin) { + this.isOrigin = false; + referenceBuilder = referenceObject.toBuilder(); + standardBuilder.toBuilder(); + } + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/ReferenceIdExchanger.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/ReferenceIdExchanger.java new file mode 100644 index 0000000000..03f2eec12d --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/ReferenceIdExchanger.java @@ -0,0 +1,86 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.parser.standardization; + +import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService; +import org.skywalking.apm.collector.cache.CacheServiceManager; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.core.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class ReferenceIdExchanger implements IdExchanger { + + private final Logger logger = LoggerFactory.getLogger(ReferenceIdExchanger.class); + + private static ReferenceIdExchanger EXCHANGER; + private ServiceNameService serviceNameService; + private final CacheServiceManager cacheServiceManager; + + public static ReferenceIdExchanger getInstance(CacheServiceManager cacheServiceManager) { + if (EXCHANGER == null) { + EXCHANGER = new ReferenceIdExchanger(cacheServiceManager); + } + return EXCHANGER; + } + + public ReferenceIdExchanger(CacheServiceManager cacheServiceManager) { + this.cacheServiceManager = cacheServiceManager; + serviceNameService = new ServiceNameService(cacheServiceManager); + } + + @Override public boolean exchange(ReferenceDecorator standardBuilder, int applicationId) { + if (standardBuilder.getEntryServiceId() == 0 && StringUtils.isNotEmpty(standardBuilder.getEntryServiceName())) { + int entryServiceId = serviceNameService.getOrCreate(cacheServiceManager.getInstanceCacheService().get(standardBuilder.getEntryApplicationInstanceId()), standardBuilder.getEntryServiceName()); + if (entryServiceId == 0) { + return false; + } else { + standardBuilder.toBuilder(); + standardBuilder.setEntryServiceId(entryServiceId); + standardBuilder.setEntryServiceName(Const.EMPTY_STRING); + } + } + + if (standardBuilder.getParentServiceId() == 0 && StringUtils.isNotEmpty(standardBuilder.getParentServiceName())) { + int parentServiceId = serviceNameService.getOrCreate(cacheServiceManager.getInstanceCacheService().get(standardBuilder.getParentApplicationInstanceId()), standardBuilder.getParentServiceName()); + if (parentServiceId == 0) { + return false; + } else { + standardBuilder.toBuilder(); + standardBuilder.setParentServiceId(parentServiceId); + standardBuilder.setParentServiceName(Const.EMPTY_STRING); + } + } + + if (standardBuilder.getNetworkAddressId() == 0 && StringUtils.isNotEmpty(standardBuilder.getNetworkAddress())) { + int networkAddressId = cacheServiceManager.getApplicationCacheService().get(standardBuilder.getNetworkAddress()); + if (networkAddressId == 0) { + return false; + } else { + standardBuilder.toBuilder(); + standardBuilder.setNetworkAddressId(networkAddressId); + standardBuilder.setNetworkAddress(Const.EMPTY_STRING); + } + } + return true; + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentDecorator.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentDecorator.java new file mode 100644 index 0000000000..0d29ee479c --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentDecorator.java @@ -0,0 +1,86 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.parser.standardization; + +import org.skywalking.apm.network.proto.TraceSegmentObject; +import org.skywalking.apm.network.proto.UniqueId; + +/** + * @author peng-yongsheng + */ +public class SegmentDecorator implements StandardBuilder { + private boolean isOrigin = true; + private final TraceSegmentObject segmentObject; + private TraceSegmentObject.Builder segmentBuilder; + + public SegmentDecorator(TraceSegmentObject segmentObject) { + this.segmentObject = segmentObject; + } + + public int getApplicationId() { + return segmentObject.getApplicationId(); + } + + public int getApplicationInstanceId() { + return segmentObject.getApplicationInstanceId(); + } + + public UniqueId getTraceSegmentId() { + return segmentObject.getTraceSegmentId(); + } + + public int getSpansCount() { + return segmentObject.getSpansCount(); + } + + public SpanDecorator getSpans(int index) { + if (isOrigin) { + return new SpanDecorator(segmentObject.getSpans(index), this); + } else { + return new SpanDecorator(segmentBuilder.getSpansBuilder(index), this); + } + } + + public int getRefsCount() { + return segmentObject.getRefsCount(); + } + + public ReferenceDecorator getRefs(int index) { + if (isOrigin) { + return new ReferenceDecorator(segmentObject.getRefs(index), this); + } else { + return new ReferenceDecorator(segmentBuilder.getRefsBuilder(index), this); + } + } + + public byte[] toByteArray() { + if (isOrigin) { + return segmentObject.toByteArray(); + } else { + return segmentBuilder.build().toByteArray(); + } + } + + @Override public void toBuilder() { + if (isOrigin) { + this.isOrigin = false; + this.segmentBuilder = segmentObject.toBuilder(); + } + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java new file mode 100644 index 0000000000..83e379f6b0 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java @@ -0,0 +1,72 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.parser.standardization; + +import org.skywalking.apm.collector.agent.stream.buffer.SegmentBufferManager; +import org.skywalking.apm.collector.cache.CacheServiceManager; +import org.skywalking.apm.collector.queue.service.QueueCreatorService; +import org.skywalking.apm.collector.storage.service.DAOService; +import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker; +import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider; +import org.skywalking.apm.collector.stream.worker.base.WorkerException; +import org.skywalking.apm.network.proto.UpstreamSegment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker { + + private final Logger logger = LoggerFactory.getLogger(SegmentStandardizationWorker.class); + + public SegmentStandardizationWorker(DAOService daoService, CacheServiceManager cacheServiceManager) { + super(daoService, cacheServiceManager); + SegmentBufferManager.INSTANCE.initialize(); + } + + @Override public int id() { + return 0; + } + + @Override protected void onWork(UpstreamSegment upstreamSegment) throws WorkerException { + SegmentBufferManager.INSTANCE.writeBuffer(upstreamSegment); + } + + public final void flushAndSwitch() { + SegmentBufferManager.INSTANCE.flush(); + } + + public static class Factory extends AbstractLocalAsyncWorkerProvider { + public Factory(DAOService daoService, CacheServiceManager cacheServiceManager, + QueueCreatorService queueCreatorService) { + super(daoService, cacheServiceManager, queueCreatorService); + } + + @Override + public SegmentStandardizationWorker workerInstance(DAOService daoService, + CacheServiceManager cacheServiceManager) { + return new SegmentStandardizationWorker(getDaoService(), getCacheServiceManager()); + } + + @Override public int queueSize() { + return 1024; + } + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SpanDecorator.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SpanDecorator.java new file mode 100644 index 0000000000..a6f6fe0e14 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SpanDecorator.java @@ -0,0 +1,200 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.parser.standardization; + +import org.skywalking.apm.network.proto.SpanLayer; +import org.skywalking.apm.network.proto.SpanObject; +import org.skywalking.apm.network.proto.SpanType; + +/** + * @author peng-yongsheng + */ +public class SpanDecorator implements StandardBuilder { + private boolean isOrigin = true; + private StandardBuilder standardBuilder; + private SpanObject spanObject; + private SpanObject.Builder spanBuilder; + + public SpanDecorator(SpanObject spanObject, StandardBuilder standardBuilder) { + this.spanObject = spanObject; + this.standardBuilder = standardBuilder; + } + + public SpanDecorator(SpanObject.Builder spanBuilder, StandardBuilder standardBuilder) { + this.spanBuilder = spanBuilder; + this.standardBuilder = standardBuilder; + this.isOrigin = false; + } + + public int getSpanId() { + if (isOrigin) { + return spanObject.getSpanId(); + } else { + return spanBuilder.getSpanId(); + } + } + + public int getParentSpanId() { + if (isOrigin) { + return spanObject.getParentSpanId(); + } else { + return spanBuilder.getParentSpanId(); + } + } + + public SpanType getSpanType() { + if (isOrigin) { + return spanObject.getSpanType(); + } else { + return spanBuilder.getSpanType(); + } + } + + public int getSpanTypeValue() { + if (isOrigin) { + return spanObject.getSpanTypeValue(); + } else { + return spanBuilder.getSpanTypeValue(); + } + } + + public SpanLayer getSpanLayer() { + if (isOrigin) { + return spanObject.getSpanLayer(); + } else { + return spanBuilder.getSpanLayer(); + } + } + + public int getSpanLayerValue() { + if (isOrigin) { + return spanObject.getSpanLayerValue(); + } else { + return spanBuilder.getSpanLayerValue(); + } + } + + public long getStartTime() { + if (isOrigin) { + return spanObject.getStartTime(); + } else { + return spanBuilder.getStartTime(); + } + } + + public long getEndTime() { + if (isOrigin) { + return spanObject.getEndTime(); + } else { + return spanBuilder.getEndTime(); + } + } + + public int getComponentId() { + if (isOrigin) { + return spanObject.getComponentId(); + } else { + return spanBuilder.getComponentId(); + } + } + + public String getComponent() { + if (isOrigin) { + return spanObject.getComponent(); + } else { + return spanBuilder.getComponent(); + } + } + + public int getPeerId() { + if (isOrigin) { + return spanObject.getPeerId(); + } else { + return spanBuilder.getPeerId(); + } + } + + public void setPeerId(int peerId) { + if (isOrigin) { + toBuilder(); + } + spanBuilder.setPeerId(peerId); + } + + public String getPeer() { + if (isOrigin) { + return spanObject.getPeer(); + } else { + return spanBuilder.getPeer(); + } + } + + public void setPeer(String peer) { + if (isOrigin) { + toBuilder(); + } + spanBuilder.setPeer(peer); + } + + public int getOperationNameId() { + if (isOrigin) { + return spanObject.getOperationNameId(); + } else { + return spanBuilder.getOperationNameId(); + } + } + + public void setOperationNameId(int value) { + if (isOrigin) { + toBuilder(); + } + spanBuilder.setOperationNameId(value); + } + + public String getOperationName() { + if (isOrigin) { + return spanObject.getOperationName(); + } else { + return spanBuilder.getOperationName(); + } + } + + public void setOperationName(String value) { + if (isOrigin) { + toBuilder(); + } + spanBuilder.setOperationName(value); + } + + public boolean getIsError() { + if (isOrigin) { + return spanObject.getIsError(); + } else { + return spanBuilder.getIsError(); + } + } + + @Override public void toBuilder() { + if (this.isOrigin) { + this.isOrigin = false; + spanBuilder = spanObject.toBuilder(); + standardBuilder.toBuilder(); + } + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SpanIdExchanger.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SpanIdExchanger.java new file mode 100644 index 0000000000..ad0f8acaed --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SpanIdExchanger.java @@ -0,0 +1,71 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.parser.standardization; + +import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService; +import org.skywalking.apm.collector.cache.CacheServiceManager; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.core.util.StringUtils; + +/** + * @author peng-yongsheng + */ +public class SpanIdExchanger implements IdExchanger { + + private static SpanIdExchanger EXCHANGER; + private final ServiceNameService serviceNameService; + private final CacheServiceManager cacheServiceManager; + + public static SpanIdExchanger getInstance(CacheServiceManager cacheServiceManager) { + if (EXCHANGER == null) { + EXCHANGER = new SpanIdExchanger(cacheServiceManager); + } + return EXCHANGER; + } + + public SpanIdExchanger(CacheServiceManager cacheServiceManager) { + this.cacheServiceManager = cacheServiceManager; + this.serviceNameService = new ServiceNameService(cacheServiceManager); + } + + @Override public boolean exchange(SpanDecorator standardBuilder, int applicationId) { + if (standardBuilder.getPeerId() == 0 && StringUtils.isNotEmpty(standardBuilder.getPeer())) { + int peerId = cacheServiceManager.getApplicationCacheService().get(standardBuilder.getPeer()); + if (peerId == 0) { + return false; + } else { + standardBuilder.toBuilder(); + standardBuilder.setPeerId(peerId); + standardBuilder.setPeer(Const.EMPTY_STRING); + } + } + + if (standardBuilder.getOperationNameId() == 0 && StringUtils.isNotEmpty(standardBuilder.getOperationName())) { + int operationNameId = serviceNameService.getOrCreate(applicationId, standardBuilder.getOperationName()); + if (operationNameId == 0) { + return false; + } else { + standardBuilder.toBuilder(); + standardBuilder.setOperationNameId(operationNameId); + standardBuilder.setOperationName(Const.EMPTY_STRING); + } + } + return true; + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/StandardBuilder.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/StandardBuilder.java new file mode 100644 index 0000000000..efdc89ae76 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/StandardBuilder.java @@ -0,0 +1,26 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.parser.standardization; + +/** + * @author peng-yongsheng + */ +public interface StandardBuilder { + void toBuilder(); +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/util/FileUtils.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/util/FileUtils.java new file mode 100644 index 0000000000..02608fafbe --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/util/FileUtils.java @@ -0,0 +1,89 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.util; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import org.skywalking.apm.collector.core.util.Const; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public enum FileUtils { + INSTANCE; + + private final Logger logger = LoggerFactory.getLogger(FileUtils.class); + + public String readLastLine(File file) { + RandomAccessFile randomAccessFile = null; + try { + randomAccessFile = new RandomAccessFile(file, "r"); + long length = randomAccessFile.length(); + if (length == 0) { + return Const.EMPTY_STRING; + } else { + long position = length - 1; + randomAccessFile.seek(position); + while (position >= 0) { + if (randomAccessFile.read() == '\n') { + return randomAccessFile.readLine(); + } + randomAccessFile.seek(position); + if (position == 0) { + return randomAccessFile.readLine(); + } + position--; + } + } + } catch (IOException e) { + logger.error(e.getMessage(), e); + } finally { + if (randomAccessFile != null) { + try { + randomAccessFile.close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + } + return Const.EMPTY_STRING; + } + + public void writeAppendToLast(File file, RandomAccessFile randomAccessFile, String value) { + if (randomAccessFile == null) { + try { + randomAccessFile = new RandomAccessFile(file, "rwd"); + } catch (FileNotFoundException e) { + logger.error(e.getMessage(), e); + } + } + try { + long length = randomAccessFile.length(); + randomAccessFile.seek(length); + randomAccessFile.writeBytes(System.lineSeparator()); + randomAccessFile.writeBytes(value); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java new file mode 100644 index 0000000000..84608d5387 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java @@ -0,0 +1,50 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.worker.register; + +import org.skywalking.apm.collector.cache.CacheServiceManager; +import org.skywalking.apm.collector.storage.table.register.ServiceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class ServiceNameService { + + private final Logger logger = LoggerFactory.getLogger(ServiceNameService.class); + + private final CacheServiceManager cacheServiceManager; + + public ServiceNameService(CacheServiceManager cacheServiceManager) { + this.cacheServiceManager = cacheServiceManager; + } + + public int getOrCreate(int applicationId, String serviceName) { + int serviceId = cacheServiceManager.getServiceIdCacheService().get(applicationId, serviceName); + + if (serviceId == 0) { + ServiceName service = new ServiceName("0"); + service.setApplicationId(applicationId); + service.setServiceName(serviceName); + service.setServiceId(0); + } + return serviceId; + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/global/GlobalTraceSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/global/GlobalTraceSpanListener.java new file mode 100644 index 0000000000..c501e4b424 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/global/GlobalTraceSpanListener.java @@ -0,0 +1,73 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.worker.trace.global; + +import java.util.ArrayList; +import java.util.List; +import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; +import org.skywalking.apm.collector.agent.stream.parser.GlobalTraceIdsListener; +import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.skywalking.apm.collector.storage.table.global.GlobalTrace; +import org.skywalking.apm.network.proto.UniqueId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class GlobalTraceSpanListener implements FirstSpanListener, GlobalTraceIdsListener { + + private final Logger logger = LoggerFactory.getLogger(GlobalTraceSpanListener.class); + + private List globalTraceIds = new ArrayList<>(); + private String segmentId; + private long timeBucket; + + @Override + public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId, + String segmentId) { + this.timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime()); + this.segmentId = segmentId; + } + + @Override public void parseGlobalTraceId(UniqueId uniqueId) { + StringBuilder globalTraceIdBuilder = new StringBuilder(); + for (int i = 0; i < uniqueId.getIdPartsList().size(); i++) { + if (i == 0) { + globalTraceIdBuilder.append(uniqueId.getIdPartsList().get(i)); + } else { + globalTraceIdBuilder.append(".").append(uniqueId.getIdPartsList().get(i)); + } + } + globalTraceIds.add(globalTraceIdBuilder.toString()); + } + + @Override public void build() { + logger.debug("global trace listener build"); + + for (String globalTraceId : globalTraceIds) { + GlobalTrace globalTrace = new GlobalTrace(segmentId + Const.ID_SPLIT + globalTraceId); + globalTrace.setGlobalTraceId(globalTraceId); + globalTrace.setSegmentId(segmentId); + globalTrace.setTimeBucket(timeBucket); + } + } +} \ No newline at end of file diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstPerformanceSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstPerformanceSpanListener.java new file mode 100644 index 0000000000..3ed7d16627 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/instance/InstPerformanceSpanListener.java @@ -0,0 +1,64 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.worker.trace.instance; + +import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; +import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; +import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.skywalking.apm.collector.storage.table.instance.InstPerformance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class InstPerformanceSpanListener implements EntrySpanListener, FirstSpanListener { + + private final Logger logger = LoggerFactory.getLogger(InstPerformanceSpanListener.class); + + private int applicationId; + private int instanceId; + private long cost; + private long timeBucket; + + @Override + public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, + String segmentId) { + } + + @Override + public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId, + String segmentId) { + this.applicationId = applicationId; + this.instanceId = instanceId; + this.cost = spanDecorator.getEndTime() - spanDecorator.getStartTime(); + timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(spanDecorator.getStartTime()); + } + + @Override public void build() { + InstPerformance instPerformance = new InstPerformance(timeBucket + Const.ID_SPLIT + instanceId); + instPerformance.setApplicationId(applicationId); + instPerformance.setInstanceId(instanceId); + instPerformance.setCalls(1); + instPerformance.setCostTotal(cost); + instPerformance.setTimeBucket(timeBucket); + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeComponentSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeComponentSpanListener.java new file mode 100644 index 0000000000..6dbdd373e4 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeComponentSpanListener.java @@ -0,0 +1,97 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.worker.trace.node; + +import java.util.ArrayList; +import java.util.List; +import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; +import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener; +import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; +import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.skywalking.apm.collector.storage.table.node.NodeComponent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener { + + private final Logger logger = LoggerFactory.getLogger(NodeComponentSpanListener.class); + + private List nodeComponents = new ArrayList<>(); + private long timeBucket; + + @Override + public void parseExit(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) { + NodeComponent nodeComponent = new NodeComponent(Const.EMPTY_STRING); + nodeComponent.setComponentId(spanDecorator.getComponentId()); + + String id; + if (spanDecorator.getComponentId() == 0) { + nodeComponent.setComponentName(spanDecorator.getComponent()); + id = nodeComponent.getComponentName(); + } else { + nodeComponent.setComponentName(Const.EMPTY_STRING); + id = String.valueOf(nodeComponent.getComponentId()); + } + + nodeComponent.setPeerId(spanDecorator.getPeerId()); + id = id + Const.ID_SPLIT + nodeComponent.getPeerId(); + nodeComponent.setId(id); + nodeComponents.add(nodeComponent); + } + + @Override + public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, + String segmentId) { + NodeComponent nodeComponent = new NodeComponent(Const.EMPTY_STRING); + nodeComponent.setComponentId(spanDecorator.getComponentId()); + + String id; + if (spanDecorator.getComponentId() == 0) { + nodeComponent.setComponentName(spanDecorator.getComponent()); + id = nodeComponent.getComponentName(); + } else { + id = String.valueOf(nodeComponent.getComponentId()); + nodeComponent.setComponentName(Const.EMPTY_STRING); + } + + nodeComponent.setPeerId(applicationId); + id = id + Const.ID_SPLIT + String.valueOf(applicationId); + nodeComponent.setId(id); + + nodeComponents.add(nodeComponent); + } + + @Override + public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId, + String segmentId) { + timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime()); + } + + @Override public void build() { + nodeComponents.forEach(nodeComponent -> { + nodeComponent.setId(timeBucket + Const.ID_SPLIT + nodeComponent.getId()); + nodeComponent.setTimeBucket(timeBucket); + }); + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeMappingSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeMappingSpanListener.java new file mode 100644 index 0000000000..0bf66c26c4 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/node/NodeMappingSpanListener.java @@ -0,0 +1,68 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.worker.trace.node; + +import java.util.ArrayList; +import java.util.List; +import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; +import org.skywalking.apm.collector.agent.stream.parser.RefsListener; +import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; +import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.skywalking.apm.collector.storage.table.node.NodeMapping; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class NodeMappingSpanListener implements RefsListener, FirstSpanListener { + + private final Logger logger = LoggerFactory.getLogger(NodeMappingSpanListener.class); + + private List nodeMappings = new ArrayList<>(); + private long timeBucket; + + @Override public void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int instanceId, + String segmentId) { + logger.debug("node mapping listener parse reference"); + NodeMapping nodeMapping = new NodeMapping(Const.EMPTY_STRING); + nodeMapping.setApplicationId(applicationId); + nodeMapping.setAddressId(referenceDecorator.getNetworkAddressId()); + String id = String.valueOf(applicationId) + Const.ID_SPLIT + String.valueOf(nodeMapping.getAddressId()); + nodeMapping.setId(id); + nodeMappings.add(nodeMapping); + } + + @Override + public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId, + String segmentId) { + timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime()); + } + + @Override public void build() { + logger.debug("node mapping listener build"); + for (NodeMapping nodeMapping : nodeMappings) { + nodeMapping.setId(timeBucket + Const.ID_SPLIT + nodeMapping.getId()); + nodeMapping.setTimeBucket(timeBucket); + logger.debug("send to node mapping aggregation worker, id: {}", nodeMapping.getId()); + } + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/noderef/NodeReferenceSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/noderef/NodeReferenceSpanListener.java new file mode 100644 index 0000000000..0cc701e316 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/noderef/NodeReferenceSpanListener.java @@ -0,0 +1,127 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.worker.trace.noderef; + +import java.util.LinkedList; +import java.util.List; +import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; +import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener; +import org.skywalking.apm.collector.agent.stream.parser.RefsListener; +import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; +import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; +import org.skywalking.apm.collector.cache.CacheServiceManager; +import org.skywalking.apm.collector.core.util.CollectionUtils; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.skywalking.apm.collector.storage.table.noderef.NodeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class NodeReferenceSpanListener implements EntrySpanListener, ExitSpanListener, RefsListener { + + private final Logger logger = LoggerFactory.getLogger(NodeReferenceSpanListener.class); + + private final CacheServiceManager cacheServiceManager; + private final List nodeReferences; + private final List references; + + public NodeReferenceSpanListener(CacheServiceManager cacheServiceManager) { + this.cacheServiceManager = cacheServiceManager; + this.nodeReferences = new LinkedList<>(); + this.references = new LinkedList<>(); + } + + @Override + public void parseExit(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) { + NodeReference nodeReference = new NodeReference(Const.EMPTY_STRING); + nodeReference.setFrontApplicationId(applicationId); + nodeReference.setBehindApplicationId(spanDecorator.getPeerId()); + nodeReference.setTimeBucket(TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime())); + + StringBuilder idBuilder = new StringBuilder(); + idBuilder.append(nodeReference.getTimeBucket()).append(Const.ID_SPLIT).append(applicationId) + .append(Const.ID_SPLIT).append(spanDecorator.getPeerId()); + + nodeReference.setId(idBuilder.toString()); + nodeReferences.add(buildNodeRefSum(nodeReference, spanDecorator.getStartTime(), spanDecorator.getEndTime(), spanDecorator.getIsError())); + } + + @Override + public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, + String segmentId) { + if (CollectionUtils.isNotEmpty(references)) { + references.forEach(nodeReference -> { + nodeReference.setTimeBucket(TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime())); + String idBuilder = String.valueOf(nodeReference.getTimeBucket()) + Const.ID_SPLIT + nodeReference.getFrontApplicationId() + + Const.ID_SPLIT + nodeReference.getBehindApplicationId(); + + nodeReference.setId(idBuilder); + nodeReferences.add(buildNodeRefSum(nodeReference, spanDecorator.getStartTime(), spanDecorator.getEndTime(), spanDecorator.getIsError())); + }); + } else { + NodeReference nodeReference = new NodeReference(Const.EMPTY_STRING); + nodeReference.setFrontApplicationId(Const.USER_ID); + nodeReference.setBehindApplicationId(applicationId); + nodeReference.setTimeBucket(TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime())); + + String idBuilder = String.valueOf(nodeReference.getTimeBucket()) + Const.ID_SPLIT + nodeReference.getFrontApplicationId() + + Const.ID_SPLIT + nodeReference.getBehindApplicationId(); + + nodeReference.setId(idBuilder); + nodeReferences.add(buildNodeRefSum(nodeReference, spanDecorator.getStartTime(), spanDecorator.getEndTime(), spanDecorator.getIsError())); + } + } + + @Override public void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int instanceId, + String segmentId) { + int parentApplicationId = cacheServiceManager.getInstanceCacheService().get(referenceDecorator.getParentApplicationInstanceId()); + + NodeReference referenceSum = new NodeReference(Const.EMPTY_STRING); + referenceSum.setFrontApplicationId(parentApplicationId); + referenceSum.setBehindApplicationId(applicationId); + references.add(referenceSum); + } + + @Override public void build() { + logger.debug("node reference summary listener build"); + for (NodeReference nodeReference : nodeReferences) { + } + } + + private NodeReference buildNodeRefSum(NodeReference reference, + long startTime, long endTime, boolean isError) { + long cost = endTime - startTime; + if (cost <= 1000 && !isError) { + reference.setS1Lte(1); + } else if (1000 < cost && cost <= 3000 && !isError) { + reference.setS3Lte(1); + } else if (3000 < cost && cost <= 5000 && !isError) { + reference.setS5Lte(1); + } else if (5000 < cost && !isError) { + reference.setS5Gt(1); + } else { + reference.setError(1); + } + reference.setSummary(1); + return reference; + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/segment/SegmentCostSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/segment/SegmentCostSpanListener.java new file mode 100644 index 0000000000..d656b6c279 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/segment/SegmentCostSpanListener.java @@ -0,0 +1,98 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.worker.trace.segment; + +import java.util.ArrayList; +import java.util.List; +import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; +import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener; +import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; +import org.skywalking.apm.collector.agent.stream.parser.LocalSpanListener; +import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; +import org.skywalking.apm.collector.cache.CacheServiceManager; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.skywalking.apm.collector.storage.table.segment.SegmentCost; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener, FirstSpanListener { + + private final Logger logger = LoggerFactory.getLogger(SegmentCostSpanListener.class); + + private final List segmentCosts; + private final CacheServiceManager cacheServiceManager; + private boolean isError = false; + private long timeBucket; + + public SegmentCostSpanListener(CacheServiceManager cacheServiceManager) { + this.cacheServiceManager = cacheServiceManager; + this.segmentCosts = new ArrayList<>(); + } + + @Override + public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId, + String segmentId) { + timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime()); + + SegmentCost segmentCost = new SegmentCost(Const.EMPTY_STRING); + segmentCost.setSegmentId(segmentId); + segmentCost.setApplicationId(applicationId); + segmentCost.setCost(spanDecorator.getEndTime() - spanDecorator.getStartTime()); + segmentCost.setStartTime(spanDecorator.getStartTime()); + segmentCost.setEndTime(spanDecorator.getEndTime()); + segmentCost.setId(segmentId); + if (spanDecorator.getOperationNameId() == 0) { + segmentCost.setServiceName(spanDecorator.getOperationName()); + } else { + segmentCost.setServiceName(cacheServiceManager.getServiceNameCacheService().getSplitServiceName(cacheServiceManager.getServiceNameCacheService().get(spanDecorator.getOperationNameId()))); + } + + segmentCosts.add(segmentCost); + isError = isError || spanDecorator.getIsError(); + } + + @Override + public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, + String segmentId) { + isError = isError || spanDecorator.getIsError(); + } + + @Override + public void parseExit(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) { + isError = isError || spanDecorator.getIsError(); + } + + @Override + public void parseLocal(SpanDecorator spanDecorator, int applicationId, int instanceId, + String segmentId) { + isError = isError || spanDecorator.getIsError(); + } + + @Override public void build() { + logger.debug("segment cost listener build"); + for (SegmentCost segmentCost : segmentCosts) { + segmentCost.setIsError(isError); + segmentCost.setTimeBucket(timeBucket); + } + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceEntrySpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceEntrySpanListener.java new file mode 100644 index 0000000000..3f9131b23a --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/service/ServiceEntrySpanListener.java @@ -0,0 +1,85 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.worker.trace.service; + +import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; +import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; +import org.skywalking.apm.collector.agent.stream.parser.RefsListener; +import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; +import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; +import org.skywalking.apm.collector.cache.CacheServiceManager; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.skywalking.apm.collector.storage.table.service.ServiceEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener, EntrySpanListener { + + private final Logger logger = LoggerFactory.getLogger(ServiceEntrySpanListener.class); + + private long timeBucket; + private boolean hasReference = false; + private int applicationId; + private int entryServiceId; + private String entryServiceName; + private boolean hasEntry = false; + private final CacheServiceManager cacheServiceManager; + + public ServiceEntrySpanListener(CacheServiceManager cacheServiceManager) { + this.cacheServiceManager = cacheServiceManager; + } + + @Override + public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, + String segmentId) { + this.applicationId = applicationId; + this.entryServiceId = spanDecorator.getOperationNameId(); + this.entryServiceName = cacheServiceManager.getServiceNameCacheService().getSplitServiceName(cacheServiceManager.getServiceNameCacheService().get(entryServiceId)); + this.hasEntry = true; + } + + @Override public void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int instanceId, + String segmentId) { + hasReference = true; + } + + @Override + public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId, + String segmentId) { + timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime()); + } + + @Override public void build() { + logger.debug("entry service listener build"); + if (!hasReference && hasEntry) { + ServiceEntry serviceEntry = new ServiceEntry(applicationId + Const.ID_SPLIT + entryServiceId); + serviceEntry.setApplicationId(applicationId); + serviceEntry.setEntryServiceId(entryServiceId); + serviceEntry.setEntryServiceName(entryServiceName); + serviceEntry.setRegisterTime(timeBucket); + serviceEntry.setNewestTime(timeBucket); + + logger.debug("send to service entry aggregation worker, id: {}", serviceEntry.getId()); + } + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/serviceref/ServiceReferenceSpanListener.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/serviceref/ServiceReferenceSpanListener.java new file mode 100644 index 0000000000..a8178014a8 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/trace/serviceref/ServiceReferenceSpanListener.java @@ -0,0 +1,132 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.worker.trace.serviceref; + +import java.util.LinkedList; +import java.util.List; +import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; +import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; +import org.skywalking.apm.collector.agent.stream.parser.RefsListener; +import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; +import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class ServiceReferenceSpanListener implements FirstSpanListener, EntrySpanListener, RefsListener { + + private final Logger logger = LoggerFactory.getLogger(ServiceReferenceSpanListener.class); + + private List referenceServices = new LinkedList<>(); + private int serviceId = 0; + private long startTime = 0; + private long endTime = 0; + private boolean isError = false; + private long timeBucket; + private boolean hasEntry = false; + + @Override + public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId, + String segmentId) { + timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime()); + } + + @Override public void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int applicationInstanceId, + String segmentId) { + referenceServices.add(referenceDecorator); + } + + @Override + public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, + String segmentId) { + serviceId = spanDecorator.getOperationNameId(); + startTime = spanDecorator.getStartTime(); + endTime = spanDecorator.getEndTime(); + isError = spanDecorator.getIsError(); + this.hasEntry = true; + } + + private void calculateCost(ServiceReference serviceReference, long startTime, + long endTime, boolean isError) { + long cost = endTime - startTime; + if (cost <= 1000 && !isError) { + serviceReference.setS1Lte(1L); + } else if (1000 < cost && cost <= 3000 && !isError) { + serviceReference.setS3Lte(1L); + } else if (3000 < cost && cost <= 5000 && !isError) { + serviceReference.setS5Lte(1L); + } else if (5000 < cost && !isError) { + serviceReference.setS5Gt(1L); + } else { + serviceReference.setError(1L); + } + serviceReference.setSummary(1L); + serviceReference.setCostSummary(cost); + } + + @Override public void build() { + logger.debug("service reference listener build"); + if (hasEntry) { + if (referenceServices.size() > 0) { + referenceServices.forEach(reference -> { + ServiceReference serviceReference = new ServiceReference(Const.EMPTY_STRING); + int entryServiceId = reference.getEntryServiceId(); + int frontServiceId = reference.getParentServiceId(); + int behindServiceId = serviceId; + calculateCost(serviceReference, startTime, endTime, isError); + + logger.debug("has reference, entryServiceId: {}", entryServiceId); + sendToAggregationWorker(serviceReference, entryServiceId, frontServiceId, behindServiceId); + }); + } else { + ServiceReference serviceReference = new ServiceReference(Const.EMPTY_STRING); + int entryServiceId = serviceId; + int frontServiceId = Const.NONE_SERVICE_ID; + int behindServiceId = serviceId; + + calculateCost(serviceReference, startTime, endTime, isError); + sendToAggregationWorker(serviceReference, entryServiceId, frontServiceId, behindServiceId); + } + } + } + + private void sendToAggregationWorker(ServiceReference serviceReference, int entryServiceId, int frontServiceId, + int behindServiceId) { + StringBuilder idBuilder = new StringBuilder(); + idBuilder.append(timeBucket).append(Const.ID_SPLIT); + + idBuilder.append(entryServiceId).append(Const.ID_SPLIT); + serviceReference.setEntryServiceId(entryServiceId); + + idBuilder.append(frontServiceId).append(Const.ID_SPLIT); + serviceReference.setFrontServiceId(frontServiceId); + + idBuilder.append(behindServiceId); + serviceReference.setBehindServiceId(behindServiceId); + + serviceReference.setId(idBuilder.toString()); + serviceReference.setTimeBucket(timeBucket); + logger.debug("send to service reference aggregation worker, id: {}", serviceReference.getId()); + } +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/config/SystemConfig.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/config/SystemConfig.java new file mode 100644 index 0000000000..ec787f6843 --- /dev/null +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/config/SystemConfig.java @@ -0,0 +1,26 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.core.config; + +/** + * @author peng-yongsheng + */ +public class SystemConfig { + public static String DATA_PATH = "../data"; +} -- GitLab