BufferGroup.java 4.9 KB
Newer Older
A
ascrutae 已提交
1
package com.a.eye.skywalking.buffer;
2

A
ascrutae 已提交
3 4 5 6 7 8 9 10
import com.a.eye.skywalking.conf.Config;
import com.a.eye.skywalking.logging.LogManager;
import com.a.eye.skywalking.logging.Logger;
import com.a.eye.skywalking.selfexamination.HeathReading;
import com.a.eye.skywalking.selfexamination.SDKHealthCollector;
import com.a.eye.skywalking.sender.DataSenderFactoryWithBalance;
import com.a.eye.skywalking.protocol.common.ISerializable;
import com.a.eye.skywalking.protocol.util.AtomicRangeInteger;
11

wu-sheng's avatar
wu-sheng 已提交
12 13 14
import java.util.ArrayList;
import java.util.List;

15
public class BufferGroup {
16 17
    private static Logger logger = LogManager.getLogger(BufferGroup.class);
    private String groupName;
A
ascrutae 已提交
18
    //注意: 修改这个变量名,需要修改test-api工程的Config类中的SPAN_ARRAY_FIELD_NAME变量
A
ascrutae 已提交
19 20
    private ISerializable[] dataBuffer = new ISerializable[Config.Buffer.BUFFER_MAX_SIZE];
    AtomicRangeInteger index = new AtomicRangeInteger(0, Config.Buffer.BUFFER_MAX_SIZE);
21

22 23 24 25
    public BufferGroup(String groupName) {
        this.groupName = groupName;
        startConsumerWorker();
    }
26

27
    private void startConsumerWorker() {
A
ascrutae 已提交
28 29
        if (Config.Consumer.MAX_CONSUMER > 0) {
            int step = (int) Math.ceil(Config.Buffer.BUFFER_MAX_SIZE * 1.0 / Config.Consumer.MAX_CONSUMER);
30 31
            int start = 0, end = 0;
            while (true) {
A
ascrutae 已提交
32 33
                if (end + step >= Config.Buffer.BUFFER_MAX_SIZE) {
                    new ConsumerWorker(start, Config.Buffer.BUFFER_MAX_SIZE).start();
34 35 36 37 38 39 40 41
                    break;
                }
                end += step;
                new ConsumerWorker(start, end).start();
                start = end;
            }
        }
    }
42

A
ascrutae 已提交
43
    public void save(ISerializable data) {
44 45 46 47 48
        int i = index.getAndIncrement();
        if (dataBuffer[i] != null) {
            logger.warn(
                    "Group[{}] index[{}] data collision, discard old data.",
                    groupName, i);
A
ascrutae 已提交
49 50
            SDKHealthCollector
                    .getCurrentHeathReading("BufferGroup").updateData(HeathReading.WARNING, "BufferGroup index[" + i + "] data collision, data been coverd.");
51
        }
A
ascrutae 已提交
52
        dataBuffer[i] = data;
53 54
        SDKHealthCollector.getCurrentHeathReading("BufferGroup").updateData(HeathReading.INFO, "save span");
    }
55

56 57
    class ConsumerWorker extends Thread {
        private int start = 0;
A
ascrutae 已提交
58
        private int end = Config.Buffer.BUFFER_MAX_SIZE;
59

60 61 62 63 64
        private ConsumerWorker(int start, int end) {
            super("ConsumerWorker");
            this.start = start;
            this.end = end;
        }
65

66 67
        @Override
        public void run() {
wu-sheng's avatar
wu-sheng 已提交
68
            List<ISerializable> packageData = new ArrayList<ISerializable>();
69 70 71 72 73 74 75 76
            while (true) {
                boolean bool = false;
                try {
                    for (int i = start; i < end; i++) {
                        if (dataBuffer[i] == null) {
                            continue;
                        }
                        bool = true;
A
ascrutae 已提交
77
                        if (packageData.size() >= Config.Sender.MAX_SEND_DATA_SIZE) {
78
                            while (!DataSenderFactoryWithBalance.getSender()
A
ascrutae 已提交
79
                                    .send(packageData)) {
80
                                try {
A
ascrutae 已提交
81
                                    Thread.sleep(Config.Consumer.CONSUMER_FAIL_RETRY_WAIT_INTERVAL);
82 83 84 85
                                } catch (InterruptedException e) {
                                    logger.error("Sleep Failure");
                                }
                            }
A
ascrutae 已提交
86 87
                            logger.debug("send buried-point data, size:{}", packageData.size());
                            packageData = new ArrayList<ISerializable>();
88
                        }
89

A
ascrutae 已提交
90
                        packageData.add(dataBuffer[i]);
91 92
                        dataBuffer[i] = null;
                    }
93

A
ascrutae 已提交
94
                    if (packageData != null && packageData.size() > 0) {
95
                        while (!DataSenderFactoryWithBalance.getSender().send(
A
ascrutae 已提交
96
                                packageData)) {
97
                            try {
A
ascrutae 已提交
98
                                Thread.sleep(Config.Consumer.CONSUMER_FAIL_RETRY_WAIT_INTERVAL);
99 100 101 102
                            } catch (InterruptedException e) {
                                logger.error("Sleep Failure");
                            }
                        }
A
ascrutae 已提交
103
                        packageData = new ArrayList<ISerializable>();
104 105 106 107
                    }
                } catch (Throwable e) {
                    logger.error("buffer group running failed", e);
                }
108

109 110
                if (!bool) {
                    try {
A
ascrutae 已提交
111
                        Thread.sleep(Config.Consumer.MAX_WAIT_TIME);
112 113 114 115 116 117 118
                    } catch (InterruptedException e) {
                        logger.error("Sleep Failure");
                    }
                }
            }
        }
    }
119

120 121 122
    public String getGroupName() {
        return groupName;
    }
123 124

}