diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java index 613619ba8267bd070769c74a9f9bf26c93014b67..2ad37d2a81197aff927ddd24ee492617c11e6f40 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java @@ -70,25 +70,36 @@ public class RegisterPersistentWorker extends AbstractWorker { if (sources.size() > 1000 || registerSource.getEndOfBatchContext().isEndOfBatch()) { sources.values().forEach(source -> { - int sequence; - if ((sequence = registerLockDAO.tryLockAndIncrement(scope)) != Const.NONE) { - try { - RegisterSource dbSource = registerDAO.get(modelName, source.id()); - if (Objects.nonNull(dbSource)) { - if (dbSource.combine(source)) { - registerDAO.forceUpdate(modelName, dbSource); + try { + RegisterSource dbSource = registerDAO.get(modelName, source.id()); + if (Objects.nonNull(dbSource)) { + if (dbSource.combine(source)) { + registerDAO.forceUpdate(modelName, dbSource); + } + } else { + int sequence; + if ((sequence = registerLockDAO.tryLockAndIncrement(scope)) != Const.NONE) { + try { + dbSource = registerDAO.get(modelName, source.id()); + if (Objects.nonNull(dbSource)) { + if (dbSource.combine(source)) { + registerDAO.forceUpdate(modelName, dbSource); + } + } else { + source.setSequence(sequence); + registerDAO.forceInsert(modelName, source); + } + } catch (Throwable t) { + logger.error(t.getMessage(), t); + } finally { + registerLockDAO.releaseLock(scope); } } else { - source.setSequence(sequence); - registerDAO.forceInsert(modelName, source); + logger.info("{} inventory register try lock and increment sequence failure.", scope.name()); } - } catch (Throwable t) { - logger.error(t.getMessage(), t); - } finally { - registerLockDAO.releaseLock(scope); } - } else { - logger.info("{} inventory register try lock and increment sequence failure.", scope.name()); + } catch (Throwable t) { + logger.error(t.getMessage(), t); } }); sources.clear(); diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferData.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferData.java new file mode 100644 index 0000000000000000000000000000000000000000..4ef90c58d0d358ae8d486f8c24628b7cc834e583 --- /dev/null +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferData.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.buffer; + +import com.google.protobuf.GeneratedMessageV3; +import lombok.*; +import org.apache.skywalking.apm.network.language.agent.TraceSegmentObject; +import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject; + +/** + * @author peng-yongsheng + */ +@Getter +public class BufferData { + private MESSAGE_TYPE messageType; + @Setter private TraceSegmentObject v1Segment; + @Setter private SegmentObject v2Segment; + + public BufferData(MESSAGE_TYPE messageType) { + this.messageType = messageType; + } +} diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferDataCollection.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferDataCollection.java new file mode 100644 index 0000000000000000000000000000000000000000..89e48461c811086967da50d09c8d7b270f6abb5b --- /dev/null +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferDataCollection.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.buffer; + +import com.google.protobuf.GeneratedMessageV3; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author peng-yongsheng + */ +public class BufferDataCollection { + + private AtomicInteger index = new AtomicInteger(0); + private final List> bufferDataList; + + public BufferDataCollection(int size) { + this.bufferDataList = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + bufferDataList.add(null); + } + } + + public void add(BufferData bufferData) { + bufferDataList.set(index.getAndIncrement(), bufferData); + + } + + public int size() { + return index.get(); + } + + public synchronized List> export() { + List> exportData = new ArrayList<>(index.get()); + for (int i = 0; i < index.get(); i++) { + exportData.add(bufferDataList.get(i)); + } + index.set(0); + return exportData; + } +} diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java index f286a8315f073d62d63484caa451f334910fc8d0..eeae8ccbd4158bdeeeda349aeab03b483ca04597 100644 --- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java @@ -121,7 +121,8 @@ public class BufferStream { return this; } - public Builder callBack(DataStreamReader.CallBack callBack) { + public Builder callBack( + DataStreamReader.CallBack callBack) { this.callBack = callBack; return this; } diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java index b8cf17bf92feddadd1db593f3a1f5e1165e87794..12849bf2a034d4be57c2b7c4ec16d8ed0a42cf3f 100644 --- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java @@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.library.buffer; import com.google.protobuf.*; import java.io.*; -import java.util.Objects; +import java.util.*; import java.util.concurrent.*; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.PrefixFileFilter; @@ -38,6 +38,8 @@ public class DataStreamReader { private final Offset.ReadOffset readOffset; private final Parser parser; private final CallBack callBack; + private final int collectionSize = 100; + private final BufferDataCollection bufferDataCollection; private File readingFile; private InputStream inputStream; @@ -47,6 +49,7 @@ public class DataStreamReader { this.readOffset = readOffset; this.parser = parser; this.callBack = callBack; + this.bufferDataCollection = new BufferDataCollection<>(collectionSize); } void initialize() { @@ -114,25 +117,32 @@ public class DataStreamReader { } while (readOffset.getOffset() < readingFile.length()) { + BufferData bufferData = new BufferData<>(parser.parseDelimitedFrom(inputStream)); - MESSAGE_TYPE messageType = parser.parseDelimitedFrom(inputStream); - if (messageType != null) { - int i = 0; - while (!callBack.call(messageType)) { - try { - TimeUnit.MILLISECONDS.sleep(500); - } catch (InterruptedException e) { - logger.error(e.getMessage()); - } + if (bufferData.getMessageType() != null) { + boolean isComplete = callBack.call(bufferData); + final int serialized = bufferData.getMessageType().getSerializedSize(); + final int offset = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized; + readOffset.setOffset(readOffset.getOffset() + offset); - i++; - if (i == 10) { - break; + if (!isComplete) { + if (bufferDataCollection.size() == collectionSize) { + reCall(); } + bufferDataCollection.add(bufferData); + } + + if (logger.isDebugEnabled()) { + logger.debug("collection size: {}, max size: {}", bufferDataCollection.size(), collectionSize); + } + } else if (bufferDataCollection.size() > 0) { + reCall(); + } else { + try { + TimeUnit.SECONDS.sleep(5); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); } - final int serialized = messageType.getSerializedSize(); - final int offset = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized; - readOffset.setOffset(readOffset.getOffset() + offset); } } } catch (IOException e) { @@ -140,7 +150,31 @@ public class DataStreamReader { } } + private void reCall() { + int maxCycle = 10; + for (int i = 1; i <= maxCycle; i++) { + if (bufferDataCollection.size() > 0) { + List> bufferDataList = bufferDataCollection.export(); + for (BufferData data : bufferDataList) { + if (!callBack.call(data)) { + if (i != maxCycle) { + bufferDataCollection.add(data); + } + } + } + + try { + TimeUnit.MILLISECONDS.sleep(500); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } else { + break; + } + } + } + public interface CallBack { - boolean call(MESSAGE_TYPE message); + boolean call(BufferData bufferData); } } diff --git a/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java index 9752de785f73f9f3088a0513121db077304d7ea1..4b7e7d6190607d6804f2a1afc25a3236173c080b 100644 --- a/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java +++ b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java @@ -37,7 +37,10 @@ public class BufferStreamTestCase { builder.dataFileMaxSize(50); builder.offsetFileMaxSize(10); builder.parser(TraceSegmentObject.parser()); - builder.callBack(new SegmentParse()); + builder.callBack(bufferData -> { + logger.info("segment parse: {}", bufferData.getMessageType().getSpans(0).getSpanId()); + return false; + }); BufferStream stream = builder.build(); stream.initialize(); @@ -62,14 +65,5 @@ public class BufferStreamTestCase { TimeUnit.MILLISECONDS.sleep(50); } } - - } - - private static class SegmentParse implements DataStreamReader.CallBack { - - @Override public boolean call(TraceSegmentObject message) { - logger.info("segment parse: {}", message.getSpans(0).getSpanId()); - return true; - } } } diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java index 92e97b4fe5dbb0d1faa7e32915cb4f5e45be4879..ba6da628b69c4e75d5412082b24e5205cd3d6dff 100644 --- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java +++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java @@ -23,8 +23,7 @@ import java.util.List; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric; -import org.apache.skywalking.oap.server.library.buffer.BufferStream; -import org.apache.skywalking.oap.server.library.buffer.DataStreamReader; +import org.apache.skywalking.oap.server.library.buffer.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.*; @@ -97,11 +96,11 @@ public class MeshDataBufferFileCache implements IConsumer bufferData) { + ServiceMeshMetricDataDecorator decorator = new ServiceMeshMetricDataDecorator(bufferData.getMessageType()); if (decorator.tryMetaDataRegister()) { meshBufferFileOut.inc(); TelemetryDataDispatcher.doDispatch(decorator); diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java index eb8c692e3d0e21427273ddfea7b6c881c85f60a6..0a4bd69417ee419caa9c60ace34189f9cf2e94c2 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java @@ -22,7 +22,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import java.util.*; import lombok.Setter; import org.apache.skywalking.apm.network.language.agent.*; -import org.apache.skywalking.oap.server.library.buffer.DataStreamReader; +import org.apache.skywalking.oap.server.library.buffer.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.TimeBucketUtils; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*; @@ -66,12 +66,17 @@ public class SegmentParse { MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE); } - public boolean parse(UpstreamSegment segment, Source source) { + public boolean parse(BufferData bufferData, Source source) { createSpanListeners(); try { - List traceIds = segment.getGlobalTraceIdsList(); - TraceSegmentObject segmentObject = parseBinarySegment(segment); + UpstreamSegment upstreamSegment = bufferData.getMessageType(); + List traceIds = upstreamSegment.getGlobalTraceIdsList(); + + if (bufferData.getV1Segment() == null) { + bufferData.setV1Segment(parseBinarySegment(upstreamSegment)); + } + TraceSegmentObject segmentObject = bufferData.getV1Segment(); SegmentDecorator segmentDecorator = new SegmentDecorator(segmentObject); @@ -81,7 +86,7 @@ public class SegmentParse { } if (source.equals(Source.Agent)) { - writeToBufferFile(segmentCoreInfo.getSegmentId(), segment); + writeToBufferFile(segmentCoreInfo.getSegmentId(), upstreamSegment); } else { // from SegmentSource.Buffer TRACE_BUFFER_FILE_RETRY.inc(); @@ -127,16 +132,18 @@ public class SegmentParse { segmentCoreInfo.setDataBinary(segmentDecorator.toByteArray()); segmentCoreInfo.setV2(false); + boolean exchanged = true; + for (int i = 0; i < segmentDecorator.getSpansCount(); i++) { SpanDecorator spanDecorator = segmentDecorator.getSpans(i); if (!SpanIdExchanger.getInstance(moduleManager).exchange(spanDecorator, segmentCoreInfo.getServiceId())) { - return false; + exchanged = false; } else { for (int j = 0; j < spanDecorator.getRefsCount(); j++) { ReferenceDecorator referenceDecorator = spanDecorator.getRefs(j); if (!ReferenceIdExchanger.getInstance(moduleManager).exchange(referenceDecorator, segmentCoreInfo.getServiceId())) { - return false; + exchanged = false; } } } @@ -150,28 +157,30 @@ public class SegmentParse { segmentCoreInfo.setError(spanDecorator.getIsError() || segmentCoreInfo.isError()); } - long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(segmentCoreInfo.getStartTime()); - segmentCoreInfo.setMinuteTimeBucket(minuteTimeBucket); + if (exchanged) { + long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(segmentCoreInfo.getStartTime()); + segmentCoreInfo.setMinuteTimeBucket(minuteTimeBucket); - for (int i = 0; i < segmentDecorator.getSpansCount(); i++) { - SpanDecorator spanDecorator = segmentDecorator.getSpans(i); + for (int i = 0; i < segmentDecorator.getSpansCount(); i++) { + SpanDecorator spanDecorator = segmentDecorator.getSpans(i); - if (spanDecorator.getSpanId() == 0) { - notifyFirstListener(spanDecorator); - } + if (spanDecorator.getSpanId() == 0) { + notifyFirstListener(spanDecorator); + } - if (SpanType.Exit.equals(spanDecorator.getSpanType())) { - notifyExitListener(spanDecorator); - } else if (SpanType.Entry.equals(spanDecorator.getSpanType())) { - notifyEntryListener(spanDecorator); - } else if (SpanType.Local.equals(spanDecorator.getSpanType())) { - notifyLocalListener(spanDecorator); - } else { - logger.error("span type value was unexpected, span type name: {}", spanDecorator.getSpanType().name()); + if (SpanType.Exit.equals(spanDecorator.getSpanType())) { + notifyExitListener(spanDecorator); + } else if (SpanType.Entry.equals(spanDecorator.getSpanType())) { + notifyEntryListener(spanDecorator); + } else if (SpanType.Local.equals(spanDecorator.getSpanType())) { + notifyLocalListener(spanDecorator); + } else { + logger.error("span type value was unexpected, span type name: {}", spanDecorator.getSpanType().name()); + } } } - return true; + return exchanged; } private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) { @@ -251,13 +260,13 @@ public class SegmentParse { public void send(UpstreamSegment segment, Source source) { SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager); segmentParse.setStandardizationWorker(standardizationWorker); - segmentParse.parse(segment, source); + segmentParse.parse(new BufferData<>(segment), source); } - @Override public boolean call(UpstreamSegment segment) { + @Override public boolean call(BufferData bufferData) { SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager); segmentParse.setStandardizationWorker(standardizationWorker); - boolean parseResult = segmentParse.parse(segment, Source.Buffer); + boolean parseResult = segmentParse.parse(bufferData, Source.Buffer); if (parseResult) { segmentParse.TRACE_BUFFER_FILE_OUT.inc(); } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java index be074edb73d345db7f79ee2a5b32bd93fc2a64f9..dd28ba4639ae1aa79a5c66d301377cc1e7c56789 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java @@ -23,7 +23,7 @@ import java.util.*; import lombok.Setter; import org.apache.skywalking.apm.network.language.agent.*; import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject; -import org.apache.skywalking.oap.server.library.buffer.DataStreamReader; +import org.apache.skywalking.oap.server.library.buffer.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.TimeBucketUtils; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*; @@ -71,12 +71,18 @@ public class SegmentParseV2 { } } - public boolean parse(UpstreamSegment segment, SegmentSource source) { + public boolean parse(BufferData bufferData, SegmentSource source) { createSpanListeners(); try { - List traceIds = segment.getGlobalTraceIdsList(); - SegmentObject segmentObject = parseBinarySegment(segment); + UpstreamSegment upstreamSegment = bufferData.getMessageType(); + + List traceIds = upstreamSegment.getGlobalTraceIdsList(); + + if (bufferData.getV2Segment() == null) { + bufferData.setV2Segment(parseBinarySegment(upstreamSegment)); + } + SegmentObject segmentObject = parseBinarySegment(upstreamSegment); SegmentDecorator segmentDecorator = new SegmentDecorator(segmentObject); @@ -86,7 +92,7 @@ public class SegmentParseV2 { } if (source.equals(SegmentSource.Agent)) { - writeToBufferFile(segmentCoreInfo.getSegmentId(), segment); + writeToBufferFile(segmentCoreInfo.getSegmentId(), upstreamSegment); } else { // from SegmentSource.Buffer TRACE_BUFFER_FILE_RETRY.inc(); @@ -132,16 +138,18 @@ public class SegmentParseV2 { segmentCoreInfo.setDataBinary(segmentDecorator.toByteArray()); segmentCoreInfo.setV2(true); + boolean exchanged = true; + for (int i = 0; i < segmentDecorator.getSpansCount(); i++) { SpanDecorator spanDecorator = segmentDecorator.getSpans(i); if (!SpanIdExchanger.getInstance(moduleManager).exchange(spanDecorator, segmentCoreInfo.getServiceId())) { - return false; + exchanged = false; } else { for (int j = 0; j < spanDecorator.getRefsCount(); j++) { ReferenceDecorator referenceDecorator = spanDecorator.getRefs(j); if (!ReferenceIdExchanger.getInstance(moduleManager).exchange(referenceDecorator, segmentCoreInfo.getServiceId())) { - return false; + exchanged = false; } } } @@ -155,28 +163,30 @@ public class SegmentParseV2 { segmentCoreInfo.setError(spanDecorator.getIsError() || segmentCoreInfo.isError()); } - long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(segmentCoreInfo.getStartTime()); - segmentCoreInfo.setMinuteTimeBucket(minuteTimeBucket); + if (exchanged) { + long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(segmentCoreInfo.getStartTime()); + segmentCoreInfo.setMinuteTimeBucket(minuteTimeBucket); - for (int i = 0; i < segmentDecorator.getSpansCount(); i++) { - SpanDecorator spanDecorator = segmentDecorator.getSpans(i); + for (int i = 0; i < segmentDecorator.getSpansCount(); i++) { + SpanDecorator spanDecorator = segmentDecorator.getSpans(i); - if (spanDecorator.getSpanId() == 0) { - notifyFirstListener(spanDecorator); - } + if (spanDecorator.getSpanId() == 0) { + notifyFirstListener(spanDecorator); + } - if (SpanType.Exit.equals(spanDecorator.getSpanType())) { - notifyExitListener(spanDecorator); - } else if (SpanType.Entry.equals(spanDecorator.getSpanType())) { - notifyEntryListener(spanDecorator); - } else if (SpanType.Local.equals(spanDecorator.getSpanType())) { - notifyLocalListener(spanDecorator); - } else { - logger.error("span type value was unexpected, span type name: {}", spanDecorator.getSpanType().name()); + if (SpanType.Exit.equals(spanDecorator.getSpanType())) { + notifyExitListener(spanDecorator); + } else if (SpanType.Entry.equals(spanDecorator.getSpanType())) { + notifyEntryListener(spanDecorator); + } else if (SpanType.Local.equals(spanDecorator.getSpanType())) { + notifyLocalListener(spanDecorator); + } else { + logger.error("span type value was unexpected, span type name: {}", spanDecorator.getSpanType().name()); + } } } - return true; + return exchanged; } private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) { @@ -252,13 +262,13 @@ public class SegmentParseV2 { public void send(UpstreamSegment segment, SegmentSource source) { SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager); segmentParse.setStandardizationWorker(standardizationWorker); - segmentParse.parse(segment, source); + segmentParse.parse(new BufferData<>(segment), source); } - @Override public boolean call(UpstreamSegment segment) { + @Override public boolean call(BufferData bufferData) { SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager); segmentParse.setStandardizationWorker(standardizationWorker); - boolean parseResult = segmentParse.parse(segment, SegmentSource.Buffer); + boolean parseResult = segmentParse.parse(bufferData, SegmentSource.Buffer); if (parseResult) { segmentParse.TRACE_BUFFER_FILE_OUT.inc(); } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java index decbdde93adb9fd93d5a3c5f68fc47204c363194..2e8b98ce5635579f2f60c15b8b3e4a50b75da91d 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java @@ -53,6 +53,8 @@ public class ReferenceIdExchanger implements IdExchanger { } @Override public boolean exchange(ReferenceDecorator standardBuilder, int serviceId) { + boolean exchanged = true; + if (standardBuilder.getEntryEndpointId() == 0) { String entryEndpointName = Strings.isNullOrEmpty(standardBuilder.getEntryEndpointName()) ? Const.DOMAIN_OPERATION_NAME : standardBuilder.getEntryEndpointName(); int entryServiceId = serviceInstanceInventoryCache.get(standardBuilder.getEntryServiceInstanceId()).getServiceId(); @@ -61,7 +63,8 @@ public class ReferenceIdExchanger implements IdExchanger { if (logger.isDebugEnabled()) { logger.debug("entry endpoint name: {} from service id: {} exchange failed", entryEndpointName, entryServiceId); } - return false; + + exchanged = false; } else { standardBuilder.toBuilder(); standardBuilder.setEntryEndpointId(entryEndpointId); @@ -78,7 +81,8 @@ public class ReferenceIdExchanger implements IdExchanger { if (logger.isDebugEnabled()) { logger.debug("parent endpoint name: {} from service id: {} exchange failed", parentEndpointName, parentServiceId); } - return false; + + exchanged = false; } else { standardBuilder.toBuilder(); standardBuilder.setParentEndpointId(parentEndpointId); @@ -93,14 +97,15 @@ public class ReferenceIdExchanger implements IdExchanger { if (logger.isDebugEnabled()) { logger.debug("network getAddress: {} from service id: {} exchange failed", standardBuilder.getNetworkAddress(), serviceId); } - return false; + + exchanged = false; } else { standardBuilder.toBuilder(); standardBuilder.setNetworkAddressId(networkAddressId); standardBuilder.setNetworkAddress(Const.EMPTY_STRING); } } - return true; + return exchanged; } /** diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java index dfeeb33af275427de0ed38dd2c5d58bbd9e498a4..42bf56c8d1e27784ff5725114656ce24097a3c1b 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java @@ -63,6 +63,8 @@ public class SpanIdExchanger implements IdExchanger { } @Override public boolean exchange(SpanDecorator standardBuilder, int serviceId) { + boolean exchanged = true; + if (standardBuilder.getComponentId() == 0 && !Strings.isNullOrEmpty(standardBuilder.getComponent())) { int componentId = componentLibraryCatalogService.getComponentId(standardBuilder.getComponent()); @@ -70,7 +72,8 @@ public class SpanIdExchanger implements IdExchanger { if (logger.isDebugEnabled()) { logger.debug("component: {} in service: {} exchange failed", standardBuilder.getComponent(), serviceId); } - return false; + + exchanged = false; } else { standardBuilder.toBuilder(); standardBuilder.setComponentId(componentId); @@ -86,7 +89,8 @@ public class SpanIdExchanger implements IdExchanger { if (logger.isDebugEnabled()) { logger.debug("peer: {} in service: {} exchange failed", standardBuilder.getPeer(), serviceId); } - return false; + + exchanged = false; } else { standardBuilder.toBuilder(); standardBuilder.setPeerId(peerId); @@ -123,14 +127,15 @@ public class SpanIdExchanger implements IdExchanger { if (logger.isDebugEnabled()) { logger.debug("endpoint name: {} from service id: {} exchange failed", endpointName, serviceId); } - return false; + + exchanged = false; } else { standardBuilder.toBuilder(); standardBuilder.setOperationNameId(endpointId); standardBuilder.setOperationName(Const.EMPTY_STRING); } } - return true; + return exchanged; } private JsonObject buildServiceProperties(SpanDecorator standardBuilder) { diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java index 53e65574b3005ac4a3da319e1957b8aa8df909a4..2dec7cc497300f7101d98d0a1b04a020ae4690f4 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java @@ -43,10 +43,18 @@ public class RegisterLockInstaller { } public void install() throws StorageException { + boolean debug = System.getProperty("debug") != null; + try { if (!client.isExistsIndex(RegisterLockIndex.NAME)) { + logger.info("table: {} does not exist", RegisterLockIndex.NAME); + createIndex(); + } else if (debug) { + logger.info("table: {} exists", RegisterLockIndex.NAME); + deleteIndex(); createIndex(); } + for (Class registerSource : InventoryProcess.INSTANCE.getAllRegisterSources()) { Scope sourceScope = StorageEntityAnnotationUtils.getSourceScope(registerSource); putIfAbsent(sourceScope.ordinal()); @@ -56,6 +64,10 @@ public class RegisterLockInstaller { } } + private void deleteIndex() throws IOException { + client.deleteIndex(RegisterLockIndex.NAME); + } + private void createIndex() throws IOException { Settings settings = Settings.builder() .put("index.number_of_shards", 1)