AbstractPost.java 2.8 KB
Newer Older
1 2
package com.a.eye.skywalking.collector.worker.httpserver;

P
pengys5 已提交
3 4 5 6 7
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalAsyncWorkerRef;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
8
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
9
import com.google.gson.JsonObject;
10
import com.google.gson.stream.JsonReader;
P
pengys5 已提交
11 12
import java.io.BufferedReader;
import java.io.IOException;
13 14 15
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
P
pengys5 已提交
16 17
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
18 19 20 21

/**
 * @author pengys5
 */
P
pengys5 已提交
22

23 24 25 26 27 28
public abstract class AbstractPost extends AbstractLocalAsyncWorker {

    public AbstractPost(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
        super(role, clusterContext, selfContext);
    }

P
pengys5 已提交
29
    @Override final public void onWork(Object message) throws Exception {
30
        onReceive(message);
31 32
    }

33
    protected abstract void onReceive(Object message) throws Exception;
34 35 36

    static class PostWithHttpServlet extends AbstractHttpServlet {

P
pengys5 已提交
37 38
        private Logger logger = LogManager.getFormatterLogger(PostWithHttpServlet.class);

39 40
        private final LocalAsyncWorkerRef ownerWorkerRef;

P
pengys5 已提交
41
        PostWithHttpServlet(LocalAsyncWorkerRef ownerWorkerRef) {
42 43 44
            this.ownerWorkerRef = ownerWorkerRef;
        }

P
pengys5 已提交
45 46
        @Override        final protected void doPost(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
47 48 49
            JsonObject resJson = new JsonObject();
            try {
                BufferedReader bufferedReader = request.getReader();
50
                streamReader(bufferedReader);
51 52
                reply(response, resJson, HttpServletResponse.SC_OK);
            } catch (Exception e) {
P
pengys5 已提交
53
                logger.error(e);
54 55 56 57
                resJson.addProperty("error", e.getMessage());
                reply(response, resJson, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
            }
        }
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73

        private void streamReader(BufferedReader bufferedReader) throws Exception {
            try (JsonReader reader = new JsonReader(bufferedReader)) {
                readSegmentArray(reader);
            }
        }

        private void readSegmentArray(JsonReader reader) throws Exception {
            reader.beginArray();
            while (reader.hasNext()) {
                Segment segment = new Segment();
                segment.deserialize(reader);
                ownerWorkerRef.tell(segment);
            }
            reader.endArray();
        }
74 75
    }
}