From 9810c5e308e059a7f6fd913a74f80a4c261a28b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= <8082209@qq.com> Date: Mon, 21 Jan 2019 04:21:45 +0800 Subject: [PATCH] Improve buffer reader speed. (#2188) * 1. Sleep 500 milliseconds after a batch re-call finish. 2. Re-create register lock index when the system property named debug is setting. 3. Get the register inventory before lock to avoid increment the sequence but not use. 4. Return the exchange flag after all the references and spans in one segment parsed to reduce the number of segment parse. 5. Put the not exchanged segment into a collection then try to exchange no more than 10 times because of the exchange is asynchronous. 6. Cache the segment object to avoid repeated deserialization. #2185 * #2185 1. Sleep 500 milliseconds after a batch re-call finish. 2. Re-create register lock index when the system property named debug is setting. 3. Get the register inventory before lock to avoid increment the sequence but not use. --- .../worker/RegisterPersistentWorker.java | 41 +++++++---- .../oap/server/library/buffer/BufferData.java | 38 +++++++++++ .../library/buffer/BufferDataCollection.java | 57 ++++++++++++++++ .../server/library/buffer/BufferStream.java | 3 +- .../library/buffer/DataStreamReader.java | 68 ++++++++++++++----- .../library/buffer/BufferStreamTestCase.java | 14 ++-- .../mesh/MeshDataBufferFileCache.java | 9 ++- .../trace/provider/parser/SegmentParse.java | 61 ++++++++++------- .../trace/provider/parser/SegmentParseV2.java | 62 ++++++++++------- .../standardization/ReferenceIdExchanger.java | 13 ++-- .../standardization/SpanIdExchanger.java | 13 ++-- .../lock/RegisterLockInstaller.java | 12 ++++ 12 files changed, 283 insertions(+), 108 deletions(-) create mode 100644 oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferData.java create mode 100644 oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferDataCollection.java diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java index 613619ba82..2ad37d2a81 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java @@ -70,25 +70,36 @@ public class RegisterPersistentWorker extends AbstractWorker { if (sources.size() > 1000 || registerSource.getEndOfBatchContext().isEndOfBatch()) { sources.values().forEach(source -> { - int sequence; - if ((sequence = registerLockDAO.tryLockAndIncrement(scope)) != Const.NONE) { - try { - RegisterSource dbSource = registerDAO.get(modelName, source.id()); - if (Objects.nonNull(dbSource)) { - if (dbSource.combine(source)) { - registerDAO.forceUpdate(modelName, dbSource); + try { + RegisterSource dbSource = registerDAO.get(modelName, source.id()); + if (Objects.nonNull(dbSource)) { + if (dbSource.combine(source)) { + registerDAO.forceUpdate(modelName, dbSource); + } + } else { + int sequence; + if ((sequence = registerLockDAO.tryLockAndIncrement(scope)) != Const.NONE) { + try { + dbSource = registerDAO.get(modelName, source.id()); + if (Objects.nonNull(dbSource)) { + if (dbSource.combine(source)) { + registerDAO.forceUpdate(modelName, dbSource); + } + } else { + source.setSequence(sequence); + registerDAO.forceInsert(modelName, source); + } + } catch (Throwable t) { + logger.error(t.getMessage(), t); + } finally { + registerLockDAO.releaseLock(scope); } } else { - source.setSequence(sequence); - registerDAO.forceInsert(modelName, source); + logger.info("{} inventory register try lock and increment sequence failure.", scope.name()); } - } catch (Throwable t) { - logger.error(t.getMessage(), t); - } finally { - registerLockDAO.releaseLock(scope); } - } else { - logger.info("{} inventory register try lock and increment sequence failure.", scope.name()); + } catch (Throwable t) { + logger.error(t.getMessage(), t); } }); sources.clear(); diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferData.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferData.java new file mode 100644 index 0000000000..4ef90c58d0 --- /dev/null +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferData.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.buffer; + +import com.google.protobuf.GeneratedMessageV3; +import lombok.*; +import org.apache.skywalking.apm.network.language.agent.TraceSegmentObject; +import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject; + +/** + * @author peng-yongsheng + */ +@Getter +public class BufferData { + private MESSAGE_TYPE messageType; + @Setter private TraceSegmentObject v1Segment; + @Setter private SegmentObject v2Segment; + + public BufferData(MESSAGE_TYPE messageType) { + this.messageType = messageType; + } +} diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferDataCollection.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferDataCollection.java new file mode 100644 index 0000000000..89e48461c8 --- /dev/null +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferDataCollection.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.buffer; + +import com.google.protobuf.GeneratedMessageV3; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author peng-yongsheng + */ +public class BufferDataCollection { + + private AtomicInteger index = new AtomicInteger(0); + private final List> bufferDataList; + + public BufferDataCollection(int size) { + this.bufferDataList = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + bufferDataList.add(null); + } + } + + public void add(BufferData bufferData) { + bufferDataList.set(index.getAndIncrement(), bufferData); + + } + + public int size() { + return index.get(); + } + + public synchronized List> export() { + List> exportData = new ArrayList<>(index.get()); + for (int i = 0; i < index.get(); i++) { + exportData.add(bufferDataList.get(i)); + } + index.set(0); + return exportData; + } +} diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java index f286a8315f..eeae8ccbd4 100644 --- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java @@ -121,7 +121,8 @@ public class BufferStream { return this; } - public Builder callBack(DataStreamReader.CallBack callBack) { + public Builder callBack( + DataStreamReader.CallBack callBack) { this.callBack = callBack; return this; } diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java index b8cf17bf92..12849bf2a0 100644 --- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java @@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.library.buffer; import com.google.protobuf.*; import java.io.*; -import java.util.Objects; +import java.util.*; import java.util.concurrent.*; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.PrefixFileFilter; @@ -38,6 +38,8 @@ public class DataStreamReader { private final Offset.ReadOffset readOffset; private final Parser parser; private final CallBack callBack; + private final int collectionSize = 100; + private final BufferDataCollection bufferDataCollection; private File readingFile; private InputStream inputStream; @@ -47,6 +49,7 @@ public class DataStreamReader { this.readOffset = readOffset; this.parser = parser; this.callBack = callBack; + this.bufferDataCollection = new BufferDataCollection<>(collectionSize); } void initialize() { @@ -114,25 +117,32 @@ public class DataStreamReader { } while (readOffset.getOffset() < readingFile.length()) { + BufferData bufferData = new BufferData<>(parser.parseDelimitedFrom(inputStream)); - MESSAGE_TYPE messageType = parser.parseDelimitedFrom(inputStream); - if (messageType != null) { - int i = 0; - while (!callBack.call(messageType)) { - try { - TimeUnit.MILLISECONDS.sleep(500); - } catch (InterruptedException e) { - logger.error(e.getMessage()); - } + if (bufferData.getMessageType() != null) { + boolean isComplete = callBack.call(bufferData); + final int serialized = bufferData.getMessageType().getSerializedSize(); + final int offset = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized; + readOffset.setOffset(readOffset.getOffset() + offset); - i++; - if (i == 10) { - break; + if (!isComplete) { + if (bufferDataCollection.size() == collectionSize) { + reCall(); } + bufferDataCollection.add(bufferData); + } + + if (logger.isDebugEnabled()) { + logger.debug("collection size: {}, max size: {}", bufferDataCollection.size(), collectionSize); + } + } else if (bufferDataCollection.size() > 0) { + reCall(); + } else { + try { + TimeUnit.SECONDS.sleep(5); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); } - final int serialized = messageType.getSerializedSize(); - final int offset = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized; - readOffset.setOffset(readOffset.getOffset() + offset); } } } catch (IOException e) { @@ -140,7 +150,31 @@ public class DataStreamReader { } } + private void reCall() { + int maxCycle = 10; + for (int i = 1; i <= maxCycle; i++) { + if (bufferDataCollection.size() > 0) { + List> bufferDataList = bufferDataCollection.export(); + for (BufferData data : bufferDataList) { + if (!callBack.call(data)) { + if (i != maxCycle) { + bufferDataCollection.add(data); + } + } + } + + try { + TimeUnit.MILLISECONDS.sleep(500); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } else { + break; + } + } + } + public interface CallBack { - boolean call(MESSAGE_TYPE message); + boolean call(BufferData bufferData); } } diff --git a/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java index 9752de785f..4b7e7d6190 100644 --- a/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java +++ b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java @@ -37,7 +37,10 @@ public class BufferStreamTestCase { builder.dataFileMaxSize(50); builder.offsetFileMaxSize(10); builder.parser(TraceSegmentObject.parser()); - builder.callBack(new SegmentParse()); + builder.callBack(bufferData -> { + logger.info("segment parse: {}", bufferData.getMessageType().getSpans(0).getSpanId()); + return false; + }); BufferStream stream = builder.build(); stream.initialize(); @@ -62,14 +65,5 @@ public class BufferStreamTestCase { TimeUnit.MILLISECONDS.sleep(50); } } - - } - - private static class SegmentParse implements DataStreamReader.CallBack { - - @Override public boolean call(TraceSegmentObject message) { - logger.info("segment parse: {}", message.getSpans(0).getSpanId()); - return true; - } } } diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java index 92e97b4fe5..ba6da628b6 100644 --- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java +++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java @@ -23,8 +23,7 @@ import java.util.List; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric; -import org.apache.skywalking.oap.server.library.buffer.BufferStream; -import org.apache.skywalking.oap.server.library.buffer.DataStreamReader; +import org.apache.skywalking.oap.server.library.buffer.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.*; @@ -97,11 +96,11 @@ public class MeshDataBufferFileCache implements IConsumer bufferData) { + ServiceMeshMetricDataDecorator decorator = new ServiceMeshMetricDataDecorator(bufferData.getMessageType()); if (decorator.tryMetaDataRegister()) { meshBufferFileOut.inc(); TelemetryDataDispatcher.doDispatch(decorator); diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java index eb8c692e3d..0a4bd69417 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java @@ -22,7 +22,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import java.util.*; import lombok.Setter; import org.apache.skywalking.apm.network.language.agent.*; -import org.apache.skywalking.oap.server.library.buffer.DataStreamReader; +import org.apache.skywalking.oap.server.library.buffer.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.TimeBucketUtils; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*; @@ -66,12 +66,17 @@ public class SegmentParse { MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE); } - public boolean parse(UpstreamSegment segment, Source source) { + public boolean parse(BufferData bufferData, Source source) { createSpanListeners(); try { - List traceIds = segment.getGlobalTraceIdsList(); - TraceSegmentObject segmentObject = parseBinarySegment(segment); + UpstreamSegment upstreamSegment = bufferData.getMessageType(); + List traceIds = upstreamSegment.getGlobalTraceIdsList(); + + if (bufferData.getV1Segment() == null) { + bufferData.setV1Segment(parseBinarySegment(upstreamSegment)); + } + TraceSegmentObject segmentObject = bufferData.getV1Segment(); SegmentDecorator segmentDecorator = new SegmentDecorator(segmentObject); @@ -81,7 +86,7 @@ public class SegmentParse { } if (source.equals(Source.Agent)) { - writeToBufferFile(segmentCoreInfo.getSegmentId(), segment); + writeToBufferFile(segmentCoreInfo.getSegmentId(), upstreamSegment); } else { // from SegmentSource.Buffer TRACE_BUFFER_FILE_RETRY.inc(); @@ -127,16 +132,18 @@ public class SegmentParse { segmentCoreInfo.setDataBinary(segmentDecorator.toByteArray()); segmentCoreInfo.setV2(false); + boolean exchanged = true; + for (int i = 0; i < segmentDecorator.getSpansCount(); i++) { SpanDecorator spanDecorator = segmentDecorator.getSpans(i); if (!SpanIdExchanger.getInstance(moduleManager).exchange(spanDecorator, segmentCoreInfo.getServiceId())) { - return false; + exchanged = false; } else { for (int j = 0; j < spanDecorator.getRefsCount(); j++) { ReferenceDecorator referenceDecorator = spanDecorator.getRefs(j); if (!ReferenceIdExchanger.getInstance(moduleManager).exchange(referenceDecorator, segmentCoreInfo.getServiceId())) { - return false; + exchanged = false; } } } @@ -150,28 +157,30 @@ public class SegmentParse { segmentCoreInfo.setError(spanDecorator.getIsError() || segmentCoreInfo.isError()); } - long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(segmentCoreInfo.getStartTime()); - segmentCoreInfo.setMinuteTimeBucket(minuteTimeBucket); + if (exchanged) { + long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(segmentCoreInfo.getStartTime()); + segmentCoreInfo.setMinuteTimeBucket(minuteTimeBucket); - for (int i = 0; i < segmentDecorator.getSpansCount(); i++) { - SpanDecorator spanDecorator = segmentDecorator.getSpans(i); + for (int i = 0; i < segmentDecorator.getSpansCount(); i++) { + SpanDecorator spanDecorator = segmentDecorator.getSpans(i); - if (spanDecorator.getSpanId() == 0) { - notifyFirstListener(spanDecorator); - } + if (spanDecorator.getSpanId() == 0) { + notifyFirstListener(spanDecorator); + } - if (SpanType.Exit.equals(spanDecorator.getSpanType())) { - notifyExitListener(spanDecorator); - } else if (SpanType.Entry.equals(spanDecorator.getSpanType())) { - notifyEntryListener(spanDecorator); - } else if (SpanType.Local.equals(spanDecorator.getSpanType())) { - notifyLocalListener(spanDecorator); - } else { - logger.error("span type value was unexpected, span type name: {}", spanDecorator.getSpanType().name()); + if (SpanType.Exit.equals(spanDecorator.getSpanType())) { + notifyExitListener(spanDecorator); + } else if (SpanType.Entry.equals(spanDecorator.getSpanType())) { + notifyEntryListener(spanDecorator); + } else if (SpanType.Local.equals(spanDecorator.getSpanType())) { + notifyLocalListener(spanDecorator); + } else { + logger.error("span type value was unexpected, span type name: {}", spanDecorator.getSpanType().name()); + } } } - return true; + return exchanged; } private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) { @@ -251,13 +260,13 @@ public class SegmentParse { public void send(UpstreamSegment segment, Source source) { SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager); segmentParse.setStandardizationWorker(standardizationWorker); - segmentParse.parse(segment, source); + segmentParse.parse(new BufferData<>(segment), source); } - @Override public boolean call(UpstreamSegment segment) { + @Override public boolean call(BufferData bufferData) { SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager); segmentParse.setStandardizationWorker(standardizationWorker); - boolean parseResult = segmentParse.parse(segment, Source.Buffer); + boolean parseResult = segmentParse.parse(bufferData, Source.Buffer); if (parseResult) { segmentParse.TRACE_BUFFER_FILE_OUT.inc(); } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java index be074edb73..dd28ba4639 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java @@ -23,7 +23,7 @@ import java.util.*; import lombok.Setter; import org.apache.skywalking.apm.network.language.agent.*; import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject; -import org.apache.skywalking.oap.server.library.buffer.DataStreamReader; +import org.apache.skywalking.oap.server.library.buffer.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.TimeBucketUtils; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*; @@ -71,12 +71,18 @@ public class SegmentParseV2 { } } - public boolean parse(UpstreamSegment segment, SegmentSource source) { + public boolean parse(BufferData bufferData, SegmentSource source) { createSpanListeners(); try { - List traceIds = segment.getGlobalTraceIdsList(); - SegmentObject segmentObject = parseBinarySegment(segment); + UpstreamSegment upstreamSegment = bufferData.getMessageType(); + + List traceIds = upstreamSegment.getGlobalTraceIdsList(); + + if (bufferData.getV2Segment() == null) { + bufferData.setV2Segment(parseBinarySegment(upstreamSegment)); + } + SegmentObject segmentObject = parseBinarySegment(upstreamSegment); SegmentDecorator segmentDecorator = new SegmentDecorator(segmentObject); @@ -86,7 +92,7 @@ public class SegmentParseV2 { } if (source.equals(SegmentSource.Agent)) { - writeToBufferFile(segmentCoreInfo.getSegmentId(), segment); + writeToBufferFile(segmentCoreInfo.getSegmentId(), upstreamSegment); } else { // from SegmentSource.Buffer TRACE_BUFFER_FILE_RETRY.inc(); @@ -132,16 +138,18 @@ public class SegmentParseV2 { segmentCoreInfo.setDataBinary(segmentDecorator.toByteArray()); segmentCoreInfo.setV2(true); + boolean exchanged = true; + for (int i = 0; i < segmentDecorator.getSpansCount(); i++) { SpanDecorator spanDecorator = segmentDecorator.getSpans(i); if (!SpanIdExchanger.getInstance(moduleManager).exchange(spanDecorator, segmentCoreInfo.getServiceId())) { - return false; + exchanged = false; } else { for (int j = 0; j < spanDecorator.getRefsCount(); j++) { ReferenceDecorator referenceDecorator = spanDecorator.getRefs(j); if (!ReferenceIdExchanger.getInstance(moduleManager).exchange(referenceDecorator, segmentCoreInfo.getServiceId())) { - return false; + exchanged = false; } } } @@ -155,28 +163,30 @@ public class SegmentParseV2 { segmentCoreInfo.setError(spanDecorator.getIsError() || segmentCoreInfo.isError()); } - long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(segmentCoreInfo.getStartTime()); - segmentCoreInfo.setMinuteTimeBucket(minuteTimeBucket); + if (exchanged) { + long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(segmentCoreInfo.getStartTime()); + segmentCoreInfo.setMinuteTimeBucket(minuteTimeBucket); - for (int i = 0; i < segmentDecorator.getSpansCount(); i++) { - SpanDecorator spanDecorator = segmentDecorator.getSpans(i); + for (int i = 0; i < segmentDecorator.getSpansCount(); i++) { + SpanDecorator spanDecorator = segmentDecorator.getSpans(i); - if (spanDecorator.getSpanId() == 0) { - notifyFirstListener(spanDecorator); - } + if (spanDecorator.getSpanId() == 0) { + notifyFirstListener(spanDecorator); + } - if (SpanType.Exit.equals(spanDecorator.getSpanType())) { - notifyExitListener(spanDecorator); - } else if (SpanType.Entry.equals(spanDecorator.getSpanType())) { - notifyEntryListener(spanDecorator); - } else if (SpanType.Local.equals(spanDecorator.getSpanType())) { - notifyLocalListener(spanDecorator); - } else { - logger.error("span type value was unexpected, span type name: {}", spanDecorator.getSpanType().name()); + if (SpanType.Exit.equals(spanDecorator.getSpanType())) { + notifyExitListener(spanDecorator); + } else if (SpanType.Entry.equals(spanDecorator.getSpanType())) { + notifyEntryListener(spanDecorator); + } else if (SpanType.Local.equals(spanDecorator.getSpanType())) { + notifyLocalListener(spanDecorator); + } else { + logger.error("span type value was unexpected, span type name: {}", spanDecorator.getSpanType().name()); + } } } - return true; + return exchanged; } private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) { @@ -252,13 +262,13 @@ public class SegmentParseV2 { public void send(UpstreamSegment segment, SegmentSource source) { SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager); segmentParse.setStandardizationWorker(standardizationWorker); - segmentParse.parse(segment, source); + segmentParse.parse(new BufferData<>(segment), source); } - @Override public boolean call(UpstreamSegment segment) { + @Override public boolean call(BufferData bufferData) { SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager); segmentParse.setStandardizationWorker(standardizationWorker); - boolean parseResult = segmentParse.parse(segment, SegmentSource.Buffer); + boolean parseResult = segmentParse.parse(bufferData, SegmentSource.Buffer); if (parseResult) { segmentParse.TRACE_BUFFER_FILE_OUT.inc(); } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java index decbdde93a..2e8b98ce56 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java @@ -53,6 +53,8 @@ public class ReferenceIdExchanger implements IdExchanger { } @Override public boolean exchange(ReferenceDecorator standardBuilder, int serviceId) { + boolean exchanged = true; + if (standardBuilder.getEntryEndpointId() == 0) { String entryEndpointName = Strings.isNullOrEmpty(standardBuilder.getEntryEndpointName()) ? Const.DOMAIN_OPERATION_NAME : standardBuilder.getEntryEndpointName(); int entryServiceId = serviceInstanceInventoryCache.get(standardBuilder.getEntryServiceInstanceId()).getServiceId(); @@ -61,7 +63,8 @@ public class ReferenceIdExchanger implements IdExchanger { if (logger.isDebugEnabled()) { logger.debug("entry endpoint name: {} from service id: {} exchange failed", entryEndpointName, entryServiceId); } - return false; + + exchanged = false; } else { standardBuilder.toBuilder(); standardBuilder.setEntryEndpointId(entryEndpointId); @@ -78,7 +81,8 @@ public class ReferenceIdExchanger implements IdExchanger { if (logger.isDebugEnabled()) { logger.debug("parent endpoint name: {} from service id: {} exchange failed", parentEndpointName, parentServiceId); } - return false; + + exchanged = false; } else { standardBuilder.toBuilder(); standardBuilder.setParentEndpointId(parentEndpointId); @@ -93,14 +97,15 @@ public class ReferenceIdExchanger implements IdExchanger { if (logger.isDebugEnabled()) { logger.debug("network getAddress: {} from service id: {} exchange failed", standardBuilder.getNetworkAddress(), serviceId); } - return false; + + exchanged = false; } else { standardBuilder.toBuilder(); standardBuilder.setNetworkAddressId(networkAddressId); standardBuilder.setNetworkAddress(Const.EMPTY_STRING); } } - return true; + return exchanged; } /** diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java index dfeeb33af2..42bf56c8d1 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java @@ -63,6 +63,8 @@ public class SpanIdExchanger implements IdExchanger { } @Override public boolean exchange(SpanDecorator standardBuilder, int serviceId) { + boolean exchanged = true; + if (standardBuilder.getComponentId() == 0 && !Strings.isNullOrEmpty(standardBuilder.getComponent())) { int componentId = componentLibraryCatalogService.getComponentId(standardBuilder.getComponent()); @@ -70,7 +72,8 @@ public class SpanIdExchanger implements IdExchanger { if (logger.isDebugEnabled()) { logger.debug("component: {} in service: {} exchange failed", standardBuilder.getComponent(), serviceId); } - return false; + + exchanged = false; } else { standardBuilder.toBuilder(); standardBuilder.setComponentId(componentId); @@ -86,7 +89,8 @@ public class SpanIdExchanger implements IdExchanger { if (logger.isDebugEnabled()) { logger.debug("peer: {} in service: {} exchange failed", standardBuilder.getPeer(), serviceId); } - return false; + + exchanged = false; } else { standardBuilder.toBuilder(); standardBuilder.setPeerId(peerId); @@ -123,14 +127,15 @@ public class SpanIdExchanger implements IdExchanger { if (logger.isDebugEnabled()) { logger.debug("endpoint name: {} from service id: {} exchange failed", endpointName, serviceId); } - return false; + + exchanged = false; } else { standardBuilder.toBuilder(); standardBuilder.setOperationNameId(endpointId); standardBuilder.setOperationName(Const.EMPTY_STRING); } } - return true; + return exchanged; } private JsonObject buildServiceProperties(SpanDecorator standardBuilder) { diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java index 53e65574b3..2dec7cc497 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java @@ -43,10 +43,18 @@ public class RegisterLockInstaller { } public void install() throws StorageException { + boolean debug = System.getProperty("debug") != null; + try { if (!client.isExistsIndex(RegisterLockIndex.NAME)) { + logger.info("table: {} does not exist", RegisterLockIndex.NAME); + createIndex(); + } else if (debug) { + logger.info("table: {} exists", RegisterLockIndex.NAME); + deleteIndex(); createIndex(); } + for (Class registerSource : InventoryProcess.INSTANCE.getAllRegisterSources()) { Scope sourceScope = StorageEntityAnnotationUtils.getSourceScope(registerSource); putIfAbsent(sourceScope.ordinal()); @@ -56,6 +64,10 @@ public class RegisterLockInstaller { } } + private void deleteIndex() throws IOException { + client.deleteIndex(RegisterLockIndex.NAME); + } + private void createIndex() throws IOException { Settings settings = Settings.builder() .put("index.number_of_shards", 1) -- GitLab