BufferGroup.java 4.9 KB
Newer Older
A
ascrutae 已提交
1
package com.a.eye.skywalking.buffer;
2

A
ascrutae 已提交
3 4 5 6 7 8 9 10
import com.a.eye.skywalking.conf.Config;
import com.a.eye.skywalking.logging.LogManager;
import com.a.eye.skywalking.logging.Logger;
import com.a.eye.skywalking.selfexamination.HeathReading;
import com.a.eye.skywalking.selfexamination.SDKHealthCollector;
import com.a.eye.skywalking.sender.DataSenderFactoryWithBalance;
import com.a.eye.skywalking.protocol.common.ISerializable;
import com.a.eye.skywalking.protocol.util.AtomicRangeInteger;
11

wu-sheng's avatar
wu-sheng 已提交
12 13 14
import java.util.ArrayList;
import java.util.List;

15
public class BufferGroup {
16 17
    private static Logger logger = LogManager.getLogger(BufferGroup.class);
    private String groupName;
A
ascrutae 已提交
18
    //注意: 修改这个变量名,需要修改test-api工程的Config类中的SPAN_ARRAY_FIELD_NAME变量
A
ascrutae 已提交
19 20
    private ISerializable[] dataBuffer = new ISerializable[Config.Buffer.BUFFER_MAX_SIZE];
    AtomicRangeInteger index = new AtomicRangeInteger(0, Config.Buffer.BUFFER_MAX_SIZE);
21

22 23 24 25
    public BufferGroup(String groupName) {
        this.groupName = groupName;
        startConsumerWorker();
    }
26

27
    private void startConsumerWorker() {
A
ascrutae 已提交
28 29
        if (Config.Consumer.MAX_CONSUMER > 0) {
            int step = (int) Math.ceil(Config.Buffer.BUFFER_MAX_SIZE * 1.0 / Config.Consumer.MAX_CONSUMER);
30 31
            int start = 0, end = 0;
            while (true) {
A
ascrutae 已提交
32 33
                if (end + step >= Config.Buffer.BUFFER_MAX_SIZE) {
                    new ConsumerWorker(start, Config.Buffer.BUFFER_MAX_SIZE).start();
34 35 36 37 38 39 40 41
                    break;
                }
                end += step;
                new ConsumerWorker(start, end).start();
                start = end;
            }
        }
    }
42

A
ascrutae 已提交
43
    public void save(ISerializable data) {
44 45 46 47 48
        int i = index.getAndIncrement();
        if (dataBuffer[i] != null) {
            logger.warn(
                    "Group[{}] index[{}] data collision, discard old data.",
                    groupName, i);
A
ascrutae 已提交
49 50
            SDKHealthCollector
                    .getCurrentHeathReading("BufferGroup").updateData(HeathReading.WARNING, "BufferGroup index[" + i + "] data collision, data been coverd.");
51
        }
A
ascrutae 已提交
52
        dataBuffer[i] = data;
53 54
        SDKHealthCollector.getCurrentHeathReading("BufferGroup").updateData(HeathReading.INFO, "save span");
    }
55

56 57
    class ConsumerWorker extends Thread {
        private int start = 0;
A
ascrutae 已提交
58
        private int end = Config.Buffer.BUFFER_MAX_SIZE;
59

60 61 62 63
        private ConsumerWorker(int start, int end) {
            super("ConsumerWorker");
            this.start = start;
            this.end = end;
A
ascrutae 已提交
64
            this.setDaemon(true);
65
        }
66

67 68
        @Override
        public void run() {
wu-sheng's avatar
wu-sheng 已提交
69
            List<ISerializable> packageData = new ArrayList<ISerializable>();
70 71 72 73 74 75 76 77
            while (true) {
                boolean bool = false;
                try {
                    for (int i = start; i < end; i++) {
                        if (dataBuffer[i] == null) {
                            continue;
                        }
                        bool = true;
A
ascrutae 已提交
78
                        if (packageData.size() >= Config.Sender.MAX_SEND_DATA_SIZE) {
79
                            while (!DataSenderFactoryWithBalance.getSender()
A
ascrutae 已提交
80
                                    .send(packageData)) {
81
                                try {
A
ascrutae 已提交
82
                                    Thread.sleep(Config.Consumer.CONSUMER_FAIL_RETRY_WAIT_INTERVAL);
83 84 85 86
                                } catch (InterruptedException e) {
                                    logger.error("Sleep Failure");
                                }
                            }
A
ascrutae 已提交
87 88
                            logger.debug("send buried-point data, size:{}", packageData.size());
                            packageData = new ArrayList<ISerializable>();
89
                        }
90

A
ascrutae 已提交
91
                        packageData.add(dataBuffer[i]);
92 93
                        dataBuffer[i] = null;
                    }
94

A
ascrutae 已提交
95
                    if (packageData != null && packageData.size() > 0) {
96
                        while (!DataSenderFactoryWithBalance.getSender().send(
A
ascrutae 已提交
97
                                packageData)) {
98
                            try {
A
ascrutae 已提交
99
                                Thread.sleep(Config.Consumer.CONSUMER_FAIL_RETRY_WAIT_INTERVAL);
100 101 102 103
                            } catch (InterruptedException e) {
                                logger.error("Sleep Failure");
                            }
                        }
A
ascrutae 已提交
104
                        packageData = new ArrayList<ISerializable>();
105 106 107 108
                    }
                } catch (Throwable e) {
                    logger.error("buffer group running failed", e);
                }
109

110 111
                if (!bool) {
                    try {
A
ascrutae 已提交
112
                        Thread.sleep(Config.Consumer.MAX_WAIT_TIME);
113 114 115 116 117 118 119
                    } catch (InterruptedException e) {
                        logger.error("Sleep Failure");
                    }
                }
            }
        }
    }
120

121 122 123
    public String getGroupName() {
        return groupName;
    }
124 125

}