SegmentCostSpanListener.java 4.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright 2017, OpenSkywalking Organization All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * Project repository: https://github.com/OpenSkywalking/skywalking
 */

P
pengys5 已提交
19 20 21 22 23
package org.skywalking.apm.collector.agentstream.worker.segment.cost;

import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
P
pengys5 已提交
24 25 26
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
27
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SpanDecorator;
28
import org.skywalking.apm.collector.cache.ServiceNameCache;
P
pengys5 已提交
29
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
30 31
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.define.segment.SegmentCostDataDefine;
P
pengys5 已提交
32 33 34 35 36 37 38 39 40 41
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author pengys5
 */
42
public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener, FirstSpanListener {
P
pengys5 已提交
43 44 45 46

    private final Logger logger = LoggerFactory.getLogger(SegmentCostSpanListener.class);

    private List<SegmentCostDataDefine.SegmentCost> segmentCosts = new ArrayList<>();
P
pengys5 已提交
47 48 49 50
    private boolean isError = false;
    private long timeBucket;

    @Override
51 52 53
    public void parseFirst(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
        String segmentId) {
        timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
P
pengys5 已提交
54 55

        SegmentCostDataDefine.SegmentCost segmentCost = new SegmentCostDataDefine.SegmentCost();
56
        segmentCost.setSegmentId(segmentId);
57
        segmentCost.setApplicationId(applicationId);
58 59 60
        segmentCost.setCost(spanDecorator.getEndTime() - spanDecorator.getStartTime());
        segmentCost.setStartTime(spanDecorator.getStartTime());
        segmentCost.setEndTime(spanDecorator.getEndTime());
61
        segmentCost.setId(segmentId);
62 63
        if (spanDecorator.getOperationNameId() == 0) {
            segmentCost.setServiceName(spanDecorator.getOperationName());
64
        } else {
65
            segmentCost.setServiceName(ServiceNameCache.getSplitServiceName(ServiceNameCache.get(spanDecorator.getOperationNameId())));
66
        }
P
pengys5 已提交
67

68
        segmentCosts.add(segmentCost);
69
        isError = isError || spanDecorator.getIsError();
P
pengys5 已提交
70 71
    }

A
ascrutae 已提交
72
    @Override
73 74 75
    public void parseEntry(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
        String segmentId) {
        isError = isError || spanDecorator.getIsError();
A
ascrutae 已提交
76 77
    }

P
pengys5 已提交
78
    @Override
79 80
    public void parseExit(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId, String segmentId) {
        isError = isError || spanDecorator.getIsError();
P
pengys5 已提交
81 82 83
    }

    @Override
84 85 86
    public void parseLocal(SpanDecorator spanDecorator, int applicationId, int applicationInstanceId,
        String segmentId) {
        isError = isError || spanDecorator.getIsError();
P
pengys5 已提交
87 88 89 90 91 92 93
    }

    @Override public void build() {
        logger.debug("segment cost listener build");
        StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);

        for (SegmentCostDataDefine.SegmentCost segmentCost : segmentCosts) {
94 95 96 97
            segmentCost.setError(isError);
            segmentCost.setTimeBucket(timeBucket);
            try {
                logger.debug("send to segment cost persistence worker, id: {}", segmentCost.getId());
98
                context.getClusterWorkerContext().lookup(SegmentCostPersistenceWorker.WorkerRole.INSTANCE).tell(segmentCost.toData());
99 100
            } catch (WorkerInvokeException | WorkerNotFoundException e) {
                logger.error(e.getMessage(), e);
P
pengys5 已提交
101 102 103
            }
        }
    }
104
}