SegmentPost.java 7.2 KB
Newer Older
1 2 3 4 5 6 7 8 9
package com.a.eye.skywalking.collector.worker.segment;

import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
P
pengys5 已提交
10
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
P
pengys5 已提交
11
import com.a.eye.skywalking.collector.worker.globaltrace.analysis.GlobalTraceAnalysis;
12 13
import com.a.eye.skywalking.collector.worker.httpserver.AbstractPost;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractPostProvider;
14 15 16 17
import com.a.eye.skywalking.collector.worker.node.analysis.NodeCompAnalysis;
import com.a.eye.skywalking.collector.worker.node.analysis.NodeMappingDayAnalysis;
import com.a.eye.skywalking.collector.worker.node.analysis.NodeMappingHourAnalysis;
import com.a.eye.skywalking.collector.worker.node.analysis.NodeMappingMinuteAnalysis;
18 19 20
import com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefDayAnalysis;
import com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefHourAnalysis;
import com.a.eye.skywalking.collector.worker.noderef.analysis.NodeRefMinuteAnalysis;
P
Fix 159  
pengys5 已提交
21 22 23
import com.a.eye.skywalking.collector.worker.segment.analysis.SegmentAnalysis;
import com.a.eye.skywalking.collector.worker.segment.analysis.SegmentCostAnalysis;
import com.a.eye.skywalking.collector.worker.segment.analysis.SegmentExceptionAnalysis;
24
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
25 26 27 28 29 30 31 32 33
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
 * @author pengys5
 */
public class SegmentPost extends AbstractPost {
34
    private static final Logger logger = LogManager.getFormatterLogger(SegmentPost.class);
35 36 37 38 39 40 41

    public SegmentPost(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
        super(role, clusterContext, selfContext);
    }

    @Override
    public void preStart() throws ProviderNotFoundException {
P
pengys5 已提交
42 43
        getClusterContext().findProvider(GlobalTraceAnalysis.Role.INSTANCE).create(this);

P
Fix 159  
pengys5 已提交
44 45 46
        getClusterContext().findProvider(SegmentAnalysis.Role.INSTANCE).create(this);
        getClusterContext().findProvider(SegmentCostAnalysis.Role.INSTANCE).create(this);
        getClusterContext().findProvider(SegmentExceptionAnalysis.Role.INSTANCE).create(this);
47 48 49 50 51

        getClusterContext().findProvider(NodeRefMinuteAnalysis.Role.INSTANCE).create(this);
        getClusterContext().findProvider(NodeRefHourAnalysis.Role.INSTANCE).create(this);
        getClusterContext().findProvider(NodeRefDayAnalysis.Role.INSTANCE).create(this);

P
Fix 159  
pengys5 已提交
52 53
        getClusterContext().findProvider(NodeCompAnalysis.Role.INSTANCE).create(this);

P
pengys5 已提交
54 55 56
        getClusterContext().findProvider(NodeMappingDayAnalysis.Role.INSTANCE).create(this);
        getClusterContext().findProvider(NodeMappingHourAnalysis.Role.INSTANCE).create(this);
        getClusterContext().findProvider(NodeMappingMinuteAnalysis.Role.INSTANCE).create(this);
57 58 59
    }

    @Override
60 61 62
    protected void onReceive(Object message) throws Exception {
        if (message instanceof Segment) {
            Segment segment = (Segment) message;
P
pengys5 已提交
63
            try {
64
                validateData(segment);
P
pengys5 已提交
65
            } catch (IllegalArgumentException e) {
66
                return;
P
pengys5 已提交
67 68
            }

69
            logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", segment.getTraceSegmentId());
P
pengys5 已提交
70

71 72 73 74
            long minuteSlice = DateTools.getMinuteSlice(segment.getStartTime());
            long hourSlice = DateTools.getHourSlice(segment.getStartTime());
            long daySlice = DateTools.getDaySlice(segment.getStartTime());
            int second = DateTools.getSecond(segment.getStartTime());
75
            logger.debug("minuteSlice: %s, hourSlice: %s, daySlice: %s, second:%s", minuteSlice, hourSlice, daySlice, second);
P
pengys5 已提交
76

77
            SegmentWithTimeSlice segmentWithTimeSlice = new SegmentWithTimeSlice(segment, minuteSlice, hourSlice, daySlice, second);
P
Fix 159  
pengys5 已提交
78
            getSelfContext().lookup(SegmentAnalysis.Role.INSTANCE).tell(segment);
P
pengys5 已提交
79

P
Fix 159  
pengys5 已提交
80
            getSelfContext().lookup(SegmentCostAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
P
pengys5 已提交
81
            getSelfContext().lookup(GlobalTraceAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
P
Fix 159  
pengys5 已提交
82
            getSelfContext().lookup(SegmentExceptionAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
P
pengys5 已提交
83

P
pengys5 已提交
84
            getSelfContext().lookup(NodeCompAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
P
pengys5 已提交
85

P
pengys5 已提交
86
            tellNodeRef(segmentWithTimeSlice);
P
pengys5 已提交
87
            tellNodeMapping(segmentWithTimeSlice);
P
pengys5 已提交
88
        }
89 90 91 92
    }

    private void tellNodeRef(SegmentWithTimeSlice segmentWithTimeSlice) throws Exception {
        getSelfContext().lookup(NodeRefMinuteAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
P
pengys5 已提交
93 94
        getSelfContext().lookup(NodeRefHourAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
        getSelfContext().lookup(NodeRefDayAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
95 96
    }

P
pengys5 已提交
97 98 99 100
    private void tellNodeMapping(SegmentWithTimeSlice segmentWithTimeSlice) throws Exception {
        getSelfContext().lookup(NodeMappingMinuteAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
        getSelfContext().lookup(NodeMappingHourAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
        getSelfContext().lookup(NodeMappingDayAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
P
pengys5 已提交
101 102
    }

103 104
    private void validateData(Segment segment) {
        if (StringUtil.isEmpty(segment.getTraceSegmentId())) {
105 106
            throw new IllegalArgumentException("traceSegmentId required");
        }
107
        if (0 == segment.getStartTime()) {
108 109 110 111 112 113 114
            throw new IllegalArgumentException("startTime required");
        }
    }

    public static class Factory extends AbstractPostProvider<SegmentPost> {
        @Override
        public String servletPath() {
115
            return "/segments";
116 117 118 119
        }

        @Override
        public int queueSize() {
120
            return WorkerConfig.Queue.Segment.SegmentPost.SIZE;
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
        }

        @Override
        public Role role() {
            return WorkerRole.INSTANCE;
        }

        @Override
        public SegmentPost workerInstance(ClusterWorkerContext clusterContext) {
            return new SegmentPost(role(), clusterContext, new LocalWorkerContext());
        }
    }

    public enum WorkerRole implements Role {
        INSTANCE;

        @Override
        public String roleName() {
            return SegmentPost.class.getSimpleName();
        }

        @Override
        public WorkerSelector workerSelector() {
            return new RollingSelector();
        }
    }

    public static class SegmentWithTimeSlice extends AbstractTimeSlice {
149
        private final Segment segment;
150

151
        public SegmentWithTimeSlice(Segment segment, long minute, long hour, long day, int second) {
152
            super(minute, hour, day, second);
153
            this.segment = segment;
154 155
        }

156 157
        public Segment getSegment() {
            return segment;
158 159 160
        }
    }
}