From e38b6ad1bd3c9f743bacbf5a8d2246987857f108 Mon Sep 17 00:00:00 2001
From: pengys5 <8082209@qq.com>
Date: Sun, 11 Jun 2017 23:49:52 +0800
Subject: [PATCH] 1. Classify http servlet interface to get, post, stream post.
2. Change the http servlet worker from local asynchronous call to local
synchronous call.
---
.../actor/AbstractLocalSyncWorker.java | 27 +++++
.../collector/worker/config/WorkerConfig.java | 4 -
.../GlobalTraceGetWithGlobalId.java | 14 +--
.../worker/httpserver/AbstractGet.java | 55 ++++++----
.../httpserver/AbstractGetProvider.java | 19 +++-
.../httpserver/AbstractHttpServlet.java | 25 -----
.../worker/httpserver/AbstractPost.java | 85 +++++----------
.../httpserver/AbstractPostProvider.java | 21 +++-
.../worker/httpserver/AbstractServlet.java | 99 +++++++++++++++++
.../worker/httpserver/AbstractStreamPost.java | 103 ++++++++++++++++++
.../AbstractStreamPostProvider.java | 35 ++++++
.../worker/httpserver/ServletsCreator.java | 32 ++++--
.../NodeRefResSumGetGroupWithTimeSlice.java | 20 ++--
.../collector/worker/segment/SegmentPost.java | 101 +++++++++++------
.../SegmentTopGetWithGlobalTraceId.java | 20 ++--
.../segment/SegmentTopGetWithTimeSlice.java | 30 +++--
.../collector/worker/span/SpanGetWithId.java | 16 ++-
.../tracedag/TraceDagGetWithTimeSlice.java | 20 ++--
...tor.worker.httpserver.AbstractPostProvider | 1 -
...rker.httpserver.AbstractStreamPostProvider | 1 +
.../GlobalTraceGetWithGlobalIdTestCase.java | 4 +-
.../httpserver/AbstractGetTestCase.java | 37 +++++--
.../httpserver/AbstractPostTestCase.java | 24 ++--
.../PostWithHttpServletTestCase.java | 30 ++---
.../worker/httpserver/TestAbstractGet.java | 7 +-
.../worker/httpserver/TestAbstractPost.java | 11 +-
.../httpserver/TestAbstractPostProvider.java | 5 -
...efResSumGetGroupWithTimeSliceTestCase.java | 35 +++---
.../worker/segment/SegmentPostTestCase.java | 49 +++++----
.../worker/segment/SegmentRealPost.java | 1 +
...egmentTopGetWithGlobalTraceIdTestCase.java | 8 +-
.../SegmentTopGetWithTimeSliceTestCase.java | 10 +-
.../worker/segment/mock/SegmentMock.java | 2 +-
.../worker/span/SpanGetWithIdTestCase.java | 4 +-
.../TraceDagGetWithTimeSliceTestCase.java | 8 +-
.../feign-default-http-9.x-plugin/pom.xml | 1 +
.../httpClient-4.x-plugin/pom.xml | 9 --
37 files changed, 642 insertions(+), 331 deletions(-)
delete mode 100644 apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractHttpServlet.java
create mode 100644 apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractServlet.java
create mode 100644 apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractStreamPost.java
create mode 100644 apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractStreamPostProvider.java
create mode 100644 apm-collector/apm-collector-worker/src/main/resources/META-INF/services/org.skywalking.apm.collector.worker.httpserver.AbstractStreamPostProvider
diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/actor/AbstractLocalSyncWorker.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/actor/AbstractLocalSyncWorker.java
index cb65224e90..1c558c9be3 100644
--- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/actor/AbstractLocalSyncWorker.java
+++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/actor/AbstractLocalSyncWorker.java
@@ -1,19 +1,46 @@
package org.skywalking.apm.collector.actor;
/**
+ * The AbstractLocalSyncWorker
use to define workers that receive data from jvm inside call and the
+ * workers response result in real time.
+ *
+ *
The implementation class is same as normal class, it make the framework be similar to the asynchronous
+ * workers inside jvm and outside jvm.
+ *
* @author pengys5
+ * @since v3.0-2017
*/
public abstract class AbstractLocalSyncWorker extends AbstractLocalWorker {
public AbstractLocalSyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
+ /**
+ * Called by the worker reference to execute the worker service.
+ *
+ * @param request {@link Object} is a in parameter
+ * @param response {@link Object} is a out parameter
+ * @throws Exception
+ */
final public void allocateJob(Object request, Object response) throws Exception {
onWork(request, response);
}
+ /**
+ * Override this method to implementing business logic.
+ *
+ * @param request {@link Object} is a in parameter
+ * @param response {@link Object} is a out parameter
+ * @throws Exception
+ */
protected abstract void onWork(Object request, Object response) throws Exception;
+ /**
+ * Called by the worker on start.
+ *
Usually, create or find the workers reference should be call.
+ *
+ * @throws ProviderNotFoundException
+ */
@Override
public void preStart() throws ProviderNotFoundException {
}
diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/config/WorkerConfig.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/config/WorkerConfig.java
index 1463e91e15..46ec2d84ee 100644
--- a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/config/WorkerConfig.java
+++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/config/WorkerConfig.java
@@ -65,10 +65,6 @@ public class WorkerConfig {
}
public static class Segment {
- public static class SegmentPost {
- public static int SIZE = 4096;
- }
-
public static class SegmentAnalysis {
public static int SIZE = 1024;
}
diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/globaltrace/GlobalTraceGetWithGlobalId.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/globaltrace/GlobalTraceGetWithGlobalId.java
index 9ca9002616..0db5a073bf 100644
--- a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/globaltrace/GlobalTraceGetWithGlobalId.java
+++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/globaltrace/GlobalTraceGetWithGlobalId.java
@@ -1,6 +1,8 @@
package org.skywalking.apm.collector.worker.globaltrace;
import com.google.gson.JsonObject;
+import java.util.Arrays;
+import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
@@ -14,9 +16,6 @@ import org.skywalking.apm.collector.worker.httpserver.AbstractGet;
import org.skywalking.apm.collector.worker.httpserver.AbstractGetProvider;
import org.skywalking.apm.collector.worker.tools.ParameterTools;
-import java.util.Arrays;
-import java.util.Map;
-
/**
* @author pengys5
*/
@@ -33,14 +32,13 @@ public class GlobalTraceGetWithGlobalId extends AbstractGet {
getClusterContext().findProvider(GlobalTraceSearchWithGlobalId.WorkerRole.INSTANCE).create(this);
}
- @Override
- protected void onSearch(Map request, JsonObject response) throws Exception {
- if (!request.containsKey("globalId")) {
+ @Override protected void onReceive(Map parameter, JsonObject response) throws Exception {
+ if (!parameter.containsKey("globalId")) {
throw new IllegalArgumentException("the request parameter must contains globalId");
}
- logger.debug("globalId: %s", Arrays.toString(request.get("globalId")));
+ logger.debug("globalId: %s", Arrays.toString(parameter.get("globalId")));
- String globalId = ParameterTools.INSTANCE.toString(request, "globalId");
+ String globalId = ParameterTools.INSTANCE.toString(parameter, "globalId");
getSelfContext().lookup(GlobalTraceSearchWithGlobalId.WorkerRole.INSTANCE).ask(globalId, response);
}
diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractGet.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractGet.java
index 1054b66dc4..c7774c0aad 100644
--- a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractGet.java
+++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractGet.java
@@ -4,41 +4,45 @@ import com.google.gson.JsonObject;
import java.io.IOException;
import java.util.Map;
import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalSyncWorkerRef;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
/**
+ * The AbstractGet
implementations represent workers, which called by the server to allow a servlet to
+ * handle a GET request.
+ *
+ * verride the {@link #onReceive(Map, JsonObject)} method to support a search service.
+ *
* @author pengys5
+ * @since v3.0-2017
*/
-public abstract class AbstractGet extends AbstractLocalSyncWorker {
-
- private Logger logger = LogManager.getFormatterLogger(AbstractGet.class);
+public abstract class AbstractGet extends AbstractServlet {
protected AbstractGet(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
- @Override final public void onWork(Object request, Object response) throws Exception {
- Map parameterMap = (Map)request;
- try {
- onSearch(parameterMap, (JsonObject)response);
- } catch (Exception e) {
- logger.error(e, e);
- ((JsonObject)response).addProperty("isSuccess", false);
- ((JsonObject)response).addProperty("reason", e.getMessage());
- }
+ /**
+ * Add final modifier to avoid the subclass override this method.
+ *
+ * @param parameter {@link Object} data structure of the map
+ * @param response {@link Object} is a out parameter
+ * @throws Exception
+ */
+ @Override final protected void onWork(Object parameter, Object response) throws Exception {
+ super.onWork(parameter, response);
}
- protected abstract void onSearch(Map request, JsonObject response) throws Exception;
+ static class GetWithHttpServlet extends HttpServlet {
- static class GetWithHttpServlet extends AbstractHttpServlet {
+ private Logger logger = LogManager.getFormatterLogger(GetWithHttpServlet.class);
private final LocalSyncWorkerRef ownerWorkerRef;
@@ -46,17 +50,24 @@ public abstract class AbstractGet extends AbstractLocalSyncWorker {
this.ownerWorkerRef = ownerWorkerRef;
}
+ /**
+ * Override the {@link HttpServlet#doGet(HttpServletRequest, HttpServletResponse)} method, receive the parameter
+ * from request then send parameter to the owner worker.
+ *
+ * @param request an {@link HttpServletRequest} object that contains the request the client has made of the
+ * servlet
+ * @param response an {@link HttpServletResponse} object that contains the response the servlet sends to the
+ * client
+ * @throws ServletException if the request for the GET could not be handled
+ * @throws IOException if an input or output error is detected when the servlet handles the GET request
+ */
@Override final protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
- Map parameterMap = request.getParameterMap();
-
- JsonObject resJson = new JsonObject();
try {
- ownerWorkerRef.ask(parameterMap, resJson);
- reply(response, resJson, HttpServletResponse.SC_OK);
+ Map parameter = request.getParameterMap();
+ ownerWorkerRef.ask(parameter, response);
} catch (Exception e) {
- resJson.addProperty("error", e.getMessage());
- reply(response, resJson, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ logger.error(e, e);
}
}
}
diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractGetProvider.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractGetProvider.java
index 61d4c2889b..6b34c08576 100644
--- a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractGetProvider.java
+++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractGetProvider.java
@@ -2,18 +2,33 @@ package org.skywalking.apm.collector.worker.httpserver;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
-import org.skywalking.apm.collector.actor.*;
+import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
+import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider;
+import org.skywalking.apm.collector.actor.AbstractWorker;
+import org.skywalking.apm.collector.actor.LocalSyncWorkerRef;
+import org.skywalking.apm.collector.actor.ProviderNotFoundException;
/**
+ * The AbstractGetProvider
implementations represent providers, which use to create {@link AbstractGet}
+ * worker instances.
+ *
* @author pengys5
+ * @since v3.0-2017
*/
public abstract class AbstractGetProvider extends AbstractLocalSyncWorkerProvider {
public abstract String servletPath();
+ /**
+ * Create worker instance, http servlet and set the worker reference into servlet.
+ *
+ * @param context use to create a mapping between url and worker.
+ * @throws IllegalArgumentException
+ * @throws ProviderNotFoundException
+ */
final protected void create(
ServletContextHandler context) throws IllegalArgumentException, ProviderNotFoundException {
- LocalSyncWorkerRef workerRef = (LocalSyncWorkerRef) super.create(AbstractWorker.noOwner());
+ LocalSyncWorkerRef workerRef = (LocalSyncWorkerRef)super.create(AbstractWorker.noOwner());
AbstractGet.GetWithHttpServlet getWithHttpServlet = new AbstractGet.GetWithHttpServlet(workerRef);
context.addServlet(new ServletHolder(getWithHttpServlet), servletPath());
}
diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractHttpServlet.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractHttpServlet.java
deleted file mode 100644
index 7b77a9daed..0000000000
--- a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractHttpServlet.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.skywalking.apm.collector.worker.httpserver;
-
-import com.google.gson.JsonObject;
-
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.io.PrintWriter;
-
-/**
- * @author pengys5
- */
-public abstract class AbstractHttpServlet extends HttpServlet {
-
- final public void reply(HttpServletResponse response, JsonObject resJson, int status) throws IOException {
- response.setContentType("text/json");
- response.setCharacterEncoding("utf-8");
- response.setStatus(status);
-
- PrintWriter out = response.getWriter();
- out.print(resJson);
- out.flush();
- out.close();
- }
-}
diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractPost.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractPost.java
index b79e47d914..5aa913a991 100644
--- a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractPost.java
+++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractPost.java
@@ -1,49 +1,58 @@
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonObject;
-import java.io.BufferedReader;
import java.io.IOException;
+import java.util.Map;
import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.skywalking.apm.collector.actor.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
-import org.skywalking.apm.collector.actor.LocalAsyncWorkerRef;
+import org.skywalking.apm.collector.actor.LocalSyncWorkerRef;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
-import org.skywalking.apm.collector.worker.segment.entity.Segment;
-import org.skywalking.apm.collector.worker.segment.entity.SegmentAndJson;
-import org.skywalking.apm.collector.worker.segment.entity.SegmentDeserialize;
/**
+ * The AbstractGet
implementations represent workers, which called by the server to allow a servlet to
+ * handle a POST request.
+ *
+ * verride the {@link #onReceive(Map, JsonObject)} method to support a search service.
+ *
* @author pengys5
+ * @since v3.0-2017
*/
-public abstract class AbstractPost extends AbstractLocalAsyncWorker {
+public abstract class AbstractPost extends AbstractServlet {
public AbstractPost(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
- @Override final public void onWork(Object message) throws Exception {
- onReceive(message);
+ /**
+ * Add final modifier to avoid the subclass override this method.
+ *
+ * @param parameter {@link Object} data structure of the map
+ * @param response {@link Object} is a out parameter
+ * @throws Exception
+ */
+ @Override final protected void onWork(Object parameter, Object response) throws Exception {
+ super.onWork(parameter, response);
}
- protected abstract void onReceive(Object message) throws Exception;
-
- static class PostWithHttpServlet extends AbstractHttpServlet {
+ static class PostWithHttpServlet extends HttpServlet {
private Logger logger = LogManager.getFormatterLogger(PostWithHttpServlet.class);
- private final LocalAsyncWorkerRef ownerWorkerRef;
+ private final LocalSyncWorkerRef ownerWorkerRef;
- PostWithHttpServlet(LocalAsyncWorkerRef ownerWorkerRef) {
+ PostWithHttpServlet(LocalSyncWorkerRef ownerWorkerRef) {
this.ownerWorkerRef = ownerWorkerRef;
}
/**
- * Get segment's buffer from request then execute deserialize operation.
+ * Override the {@link HttpServlet#doPost(HttpServletRequest, HttpServletResponse)} method, receive the
+ * parameter from request then send parameter to the owner worker.
*
* @param request an {@link HttpServletRequest} object that contains the request the client has made of the
* servlet
@@ -54,54 +63,12 @@ public abstract class AbstractPost extends AbstractLocalAsyncWorker {
*/
@Override final protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
- JsonObject resJson = new JsonObject();
try {
- BufferedReader bufferedReader = request.getReader();
- streamReader(bufferedReader);
- reply(response, resJson, HttpServletResponse.SC_OK);
+ Map parameter = request.getParameterMap();
+ ownerWorkerRef.ask(parameter, response);
} catch (Exception e) {
logger.error(e, e);
- resJson.addProperty("error", e.getMessage());
- reply(response, resJson, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
- }
- }
-
- /**
- * Read segment's buffer from buffer reader by stream mode. when finish read one segment then send to analysis.
- * This method in there, so post servlet just can receive segments data.
- *
- * @param bufferedReader an {@link BufferedReader} object that contains the segment's data using the construct of chars.
- * @throws Exception
- */
- private void streamReader(BufferedReader bufferedReader) throws Exception {
- Segment segment;
- do {
- int character;
- StringBuilder builder = new StringBuilder();
- while ((character = bufferedReader.read()) != ' ') {
- if (character == -1) {
- return;
- }
- builder.append((char)character);
- }
-
- int length = Integer.valueOf(builder.toString());
- builder = new StringBuilder();
-
- char[] buffer = new char[length];
- int readLength = bufferedReader.read(buffer, 0, length);
- if (readLength != length) {
- logger.error("The actual data length was different from the length in data head! ");
- return;
- }
- builder.append(buffer);
-
- String segmentJsonStr = builder.toString();
- segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentJsonStr);
-
- ownerWorkerRef.tell(new SegmentAndJson(segment, segmentJsonStr));
}
- while (segment != null);
}
}
}
diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractPostProvider.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractPostProvider.java
index f738c57b1b..8a2b7b52d6 100644
--- a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractPostProvider.java
+++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractPostProvider.java
@@ -2,18 +2,33 @@ package org.skywalking.apm.collector.worker.httpserver;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
-import org.skywalking.apm.collector.actor.*;
+import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
+import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider;
+import org.skywalking.apm.collector.actor.AbstractWorker;
+import org.skywalking.apm.collector.actor.LocalSyncWorkerRef;
+import org.skywalking.apm.collector.actor.ProviderNotFoundException;
/**
+ * The AbstractPostProvider
implementations represent providers, which use to create {@link AbstractPost}
+ * worker instances.
+ *
* @author pengys5
+ * @since v3.0-2017
*/
-public abstract class AbstractPostProvider extends AbstractLocalAsyncWorkerProvider {
+public abstract class AbstractPostProvider extends AbstractLocalSyncWorkerProvider {
public abstract String servletPath();
+ /**
+ * Create worker instance, http servlet and set the worker reference into servlet.
+ *
+ * @param context use to create a mapping between url and worker.
+ * @throws IllegalArgumentException
+ * @throws ProviderNotFoundException
+ */
final protected void create(
ServletContextHandler context) throws IllegalArgumentException, ProviderNotFoundException {
- LocalAsyncWorkerRef workerRef = (LocalAsyncWorkerRef) super.create(AbstractWorker.noOwner());
+ LocalSyncWorkerRef workerRef = (LocalSyncWorkerRef)super.create(AbstractWorker.noOwner());
AbstractPost.PostWithHttpServlet postWithHttpServlet = new AbstractPost.PostWithHttpServlet(workerRef);
context.addServlet(new ServletHolder(postWithHttpServlet), servletPath());
}
diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractServlet.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractServlet.java
new file mode 100644
index 0000000000..61e021471b
--- /dev/null
+++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractServlet.java
@@ -0,0 +1,99 @@
+package org.skywalking.apm.collector.worker.httpserver;
+
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Map;
+import javax.servlet.http.HttpServletResponse;
+import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
+import org.skywalking.apm.collector.actor.ClusterWorkerContext;
+import org.skywalking.apm.collector.actor.LocalWorkerContext;
+import org.skywalking.apm.collector.actor.Role;
+
+/**
+ * The AbstractServlet
implementations represent workers, which called by the server to allow a servlet to
+ * handle a request at least one method, e.g. doGet, doPost, doPut, doDelete.
+ * Provide default {@link #onWork(Object, Object)} implementation, support the data structure of the map from a http
+ * call.
+ *
+ * @author pengys5
+ * @since v3.1-2017
+ */
+public abstract class AbstractServlet extends AbstractLocalSyncWorker {
+
+ public AbstractServlet(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ super(role, clusterContext, selfContext);
+ }
+
+ /**
+ * Override this method to implementing business logic.
+ *
+ * @param parameter {@link Object} data structure of the map
+ * @param response {@link Object} is a out parameter
+ * @throws Exception
+ */
+ @Override protected void onWork(Object parameter, Object response) throws Exception {
+ JsonObject resJson = new JsonObject();
+ try {
+ onReceive((Map)parameter, resJson);
+ onSuccessResponse((HttpServletResponse)response, resJson);
+ } catch (Exception e) {
+ onErrorResponse(e, (HttpServletResponse)response);
+ }
+ }
+
+ /**
+ * Override this method to implementing business logic.
+ *
+ * @param parameter {@link Map}, get the request parameter by key.
+ * @param response {@link JsonObject}, set the response data as json object.
+ * @throws Exception if any error is detected when worker execute business logic.
+ */
+ protected abstract void onReceive(Map parameter, JsonObject response) throws Exception;
+
+ /**
+ * Set the worker response and the success status into the servlet response object
+ *
+ * @param response {@link HttpServletResponse} object that contains the response the servlet reply to the client
+ * @param resJson {@link JsonObject} object that contains the response from worker
+ * @throws IOException
+ */
+ protected void onSuccessResponse(HttpServletResponse response, JsonObject resJson) throws IOException {
+ resJson.addProperty("isSuccess", true);
+ reply(response, resJson, HttpServletResponse.SC_OK);
+ }
+
+ /**
+ * Reply the error message and server error code to client.
+ *
+ * @param exception a {@link Exception} when the worker handles the request
+ * @param response {@link HttpServletResponse} object that contains the response the servlet reply to the client
+ * @throws IOException if an input or output error is detected when the servlet handles the request
+ */
+ protected void onErrorResponse(Exception exception, HttpServletResponse response) throws IOException {
+ JsonObject resJson = new JsonObject();
+ resJson.addProperty("isSuccess", false);
+ resJson.addProperty("reason", exception.getMessage());
+
+ reply(response, resJson, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ }
+
+ /**
+ * Build the response head and body
+ *
+ * @param response {@link HttpServletResponse} object that contains the response the servlet reply to the client
+ * @param resJson {@link JsonObject} object that contains the response from worker
+ * @param status http status code
+ * @throws IOException if an input or output error is detected when the servlet handles the request
+ */
+ private void reply(HttpServletResponse response, JsonObject resJson, int status) throws IOException {
+ response.setContentType("text/json");
+ response.setCharacterEncoding("utf-8");
+ response.setStatus(status);
+
+ PrintWriter out = response.getWriter();
+ out.print(resJson);
+ out.flush();
+ out.close();
+ }
+}
diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractStreamPost.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractStreamPost.java
new file mode 100644
index 0000000000..a8a968bb33
--- /dev/null
+++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractStreamPost.java
@@ -0,0 +1,103 @@
+package org.skywalking.apm.collector.worker.httpserver;
+
+import com.google.gson.JsonObject;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.Map;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.skywalking.apm.collector.actor.ClusterWorkerContext;
+import org.skywalking.apm.collector.actor.LocalSyncWorkerRef;
+import org.skywalking.apm.collector.actor.LocalWorkerContext;
+import org.skywalking.apm.collector.actor.Role;
+
+/**
+ * The AbstractStreamPost
implementations represent workers, which called by the server to allow a servlet
+ * to handle a post request and get post the data by {@link BufferedReader}.
+ *
+ * verride the {@link #onReceive(BufferedReader, JsonObject)} method to deserialize the json construct data by buffer
+ * reader.
+ *
+ * @author pengys5
+ * @since v3.1-2017
+ */
+public abstract class AbstractStreamPost extends AbstractServlet {
+
+ public AbstractStreamPost(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
+ super(role, clusterContext, selfContext);
+ }
+
+ /**
+ * Call the {@link #onReceive(BufferedReader, JsonObject)} method, build response data to writer if success or build
+ * error response data to writer if detected exception.
+ *
+ * @param reader {@link BufferedReader} json construct
+ * @param response {@link Object} is a out parameter
+ * @throws Exception
+ */
+ @Override final protected void onWork(Object reader, Object response) throws Exception {
+ JsonObject resJson = new JsonObject();
+ try {
+ onReceive((BufferedReader)reader, resJson);
+ onSuccessResponse((HttpServletResponse)response, resJson);
+ } catch (Exception e) {
+ onErrorResponse(e, (HttpServletResponse)response);
+ }
+ }
+
+ /**
+ * Override the default implementation, forbidden to call this method.
+ *
+ * @param parameter {@link Map}, get the request parameter by key.
+ * @param response {@link JsonObject}, set the response data as json object.
+ * @throws Exception
+ */
+ @Override final protected void onReceive(Map parameter, JsonObject response) throws Exception {
+ throw new IllegalAccessException("Use the other method with buffer reader parameter");
+ }
+
+ /**
+ * Override this method to implementing business logic.
+ *
+ * @param reader {@link BufferedReader} json construct
+ * @param response {@link Object} is a out parameter
+ * @throws Exception
+ */
+ protected abstract void onReceive(BufferedReader reader, JsonObject response) throws Exception;
+
+ static class StreamPostWithHttpServlet extends HttpServlet {
+
+ private Logger logger = LogManager.getFormatterLogger(AbstractPost.PostWithHttpServlet.class);
+
+ private final LocalSyncWorkerRef ownerWorkerRef;
+
+ StreamPostWithHttpServlet(LocalSyncWorkerRef ownerWorkerRef) {
+ this.ownerWorkerRef = ownerWorkerRef;
+ }
+
+ /**
+ * Override the {@link HttpServlet#doPost(HttpServletRequest, HttpServletResponse)} method, receive the
+ * buffer from request then send buffer to the owner worker.
+ *
+ * @param request an {@link HttpServletRequest} object that contains the request the client has made of the
+ * servlet
+ * @param response {@link HttpServletResponse} object that contains the response the servlet sends to the
+ * client
+ * @throws ServletException if the request for the POST could not be handled
+ * @throws IOException if an input or output error is detected when the servlet handles the request
+ */
+ @Override final protected void doPost(HttpServletRequest request,
+ HttpServletResponse response) throws ServletException, IOException {
+ try {
+ BufferedReader reader = request.getReader();
+ ownerWorkerRef.ask(reader, response);
+ } catch (Exception e) {
+ logger.error(e, e);
+ }
+ }
+ }
+}
diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractStreamPostProvider.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractStreamPostProvider.java
new file mode 100644
index 0000000000..d2c6af40ca
--- /dev/null
+++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/AbstractStreamPostProvider.java
@@ -0,0 +1,35 @@
+package org.skywalking.apm.collector.worker.httpserver;
+
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
+import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider;
+import org.skywalking.apm.collector.actor.AbstractWorker;
+import org.skywalking.apm.collector.actor.LocalSyncWorkerRef;
+import org.skywalking.apm.collector.actor.ProviderNotFoundException;
+
+/**
+ * The AbstractStreamPostProvider
implementations represent providers, which use to create {@link
+ * AbstractStreamPost} worker instances.
+ *
+ * @author pengys5
+ * @since v3.1-2017
+ */
+public abstract class AbstractStreamPostProvider extends AbstractLocalSyncWorkerProvider {
+
+ public abstract String servletPath();
+
+ /**
+ * Create worker instance, http servlet and set the worker reference into servlet.
+ *
+ * @param context use to create a mapping between url and worker.
+ * @throws IllegalArgumentException
+ * @throws ProviderNotFoundException
+ */
+ final protected void create(
+ ServletContextHandler context) throws IllegalArgumentException, ProviderNotFoundException {
+ LocalSyncWorkerRef workerRef = (LocalSyncWorkerRef)super.create(AbstractWorker.noOwner());
+ AbstractStreamPost.StreamPostWithHttpServlet postWithHttpServlet = new AbstractStreamPost.StreamPostWithHttpServlet(workerRef);
+ context.addServlet(new ServletHolder(postWithHttpServlet), servletPath());
+ }
+}
diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/ServletsCreator.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/ServletsCreator.java
index 816296d0ce..b79124de1f 100644
--- a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/ServletsCreator.java
+++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/httpserver/ServletsCreator.java
@@ -1,32 +1,50 @@
package org.skywalking.apm.collector.worker.httpserver;
+import java.util.ServiceLoader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
-import java.util.ServiceLoader;
-
/**
+ * The ServletsCreator
is a servlet workers starter. Use SPI to create the instances implement {@link
+ * AbstractGetProvider}, {@link AbstractPostProvider}, {@link AbstractStreamPostProvider}.
+ *
* @author pengys5
+ * @since v3.1-2017
*/
public enum ServletsCreator {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(ServletsCreator.class);
+ /**
+ * Use SPI to find the servlet workers provider then use the provider to create worker instances.
+ *
+ * @param servletContextHandler add a mapping between url and worker.
+ * @param clusterContext the context contains remote worker reference.
+ * @throws IllegalArgumentException
+ * @throws ProviderNotFoundException
+ */
public void boot(ServletContextHandler servletContextHandler,
- ClusterWorkerContext clusterContext) throws IllegalArgumentException, ProviderNotFoundException {
- ServiceLoader receiverLoader = java.util.ServiceLoader.load(AbstractPostProvider.class);
- for (AbstractPostProvider provider : receiverLoader) {
+ ClusterWorkerContext clusterContext) throws IllegalArgumentException, ProviderNotFoundException {
+ ServiceLoader postServletLoader = java.util.ServiceLoader.load(AbstractPostProvider.class);
+ for (AbstractPostProvider provider : postServletLoader) {
provider.setClusterContext(clusterContext);
provider.create(servletContextHandler);
logger.info("add post servlet mapping path: %s ", provider.servletPath());
}
- ServiceLoader searcherLoader = java.util.ServiceLoader.load(AbstractGetProvider.class);
- for (AbstractGetProvider provider : searcherLoader) {
+ ServiceLoader streamPostServletLoader = java.util.ServiceLoader.load(AbstractStreamPostProvider.class);
+ for (AbstractStreamPostProvider provider : streamPostServletLoader) {
+ provider.setClusterContext(clusterContext);
+ provider.create(servletContextHandler);
+ logger.info("add stream post servlet mapping path: %s ", provider.servletPath());
+ }
+
+ ServiceLoader getServletLoader = java.util.ServiceLoader.load(AbstractGetProvider.class);
+ for (AbstractGetProvider provider : getServletLoader) {
provider.setClusterContext(clusterContext);
provider.create(servletContextHandler);
logger.info("add get servlet mapping path: %s ", provider.servletPath());
diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/noderef/NodeRefResSumGetGroupWithTimeSlice.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/noderef/NodeRefResSumGetGroupWithTimeSlice.java
index 8226cf2a4b..b7c6701cc8 100644
--- a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/noderef/NodeRefResSumGetGroupWithTimeSlice.java
+++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/noderef/NodeRefResSumGetGroupWithTimeSlice.java
@@ -1,6 +1,8 @@
package org.skywalking.apm.collector.worker.noderef;
import com.google.gson.JsonObject;
+import java.util.Arrays;
+import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
@@ -14,9 +16,6 @@ import org.skywalking.apm.collector.worker.httpserver.AbstractGetProvider;
import org.skywalking.apm.collector.worker.noderef.persistence.NodeRefResSumGroupWithTimeSlice;
import org.skywalking.apm.collector.worker.tools.ParameterTools;
-import java.util.Arrays;
-import java.util.Map;
-
/**
* @author pengys5
*/
@@ -33,30 +32,29 @@ public class NodeRefResSumGetGroupWithTimeSlice extends AbstractGet {
getClusterContext().findProvider(NodeRefResSumGroupWithTimeSlice.WorkerRole.INSTANCE).create(this);
}
- @Override
- protected void onSearch(Map request, JsonObject response) throws Exception {
- if (!request.containsKey("startTime") || !request.containsKey("endTime") || !request.containsKey("timeSliceType")) {
+ @Override protected void onReceive(Map parameter, JsonObject response) throws Exception {
+ if (!parameter.containsKey("startTime") || !parameter.containsKey("endTime") || !parameter.containsKey("timeSliceType")) {
throw new IllegalArgumentException("the request parameter must contains startTime,endTime,timeSliceType");
}
- logger.debug("startTime: %s, endTime: %s, timeSliceType: %s", Arrays.toString(request.get("startTime")),
- Arrays.toString(request.get("endTime")), Arrays.toString(request.get("timeSliceType")));
+ logger.debug("startTime: %s, endTime: %s, timeSliceType: %s", Arrays.toString(parameter.get("startTime")),
+ Arrays.toString(parameter.get("endTime")), Arrays.toString(parameter.get("timeSliceType")));
long startTime;
try {
- startTime = Long.valueOf(ParameterTools.INSTANCE.toString(request, "startTime"));
+ startTime = Long.valueOf(ParameterTools.INSTANCE.toString(parameter, "startTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter startTime must numeric with long type");
}
long endTime;
try {
- endTime = Long.valueOf(ParameterTools.INSTANCE.toString(request, "endTime"));
+ endTime = Long.valueOf(ParameterTools.INSTANCE.toString(parameter, "endTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter endTime must numeric with long type");
}
NodeRefResSumGroupWithTimeSlice.RequestEntity requestEntity;
- requestEntity = new NodeRefResSumGroupWithTimeSlice.RequestEntity(ParameterTools.INSTANCE.toString(request, "timeSliceType"), startTime, endTime);
+ requestEntity = new NodeRefResSumGroupWithTimeSlice.RequestEntity(ParameterTools.INSTANCE.toString(parameter, "timeSliceType"), startTime, endTime);
getSelfContext().lookup(NodeRefResSumGroupWithTimeSlice.WorkerRole.INSTANCE).ask(requestEntity, response);
}
diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/segment/SegmentPost.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/segment/SegmentPost.java
index a066d9122b..c800f6eec2 100644
--- a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/segment/SegmentPost.java
+++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/segment/SegmentPost.java
@@ -1,19 +1,18 @@
package org.skywalking.apm.collector.worker.segment;
+import com.google.gson.JsonObject;
+import java.io.BufferedReader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.skywalking.apm.collector.worker.segment.entity.SegmentAndJson;
-import org.skywalking.apm.util.StringUtil;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
-import org.skywalking.apm.collector.worker.config.WorkerConfig;
import org.skywalking.apm.collector.worker.globaltrace.analysis.GlobalTraceAnalysis;
-import org.skywalking.apm.collector.worker.httpserver.AbstractPost;
-import org.skywalking.apm.collector.worker.httpserver.AbstractPostProvider;
+import org.skywalking.apm.collector.worker.httpserver.AbstractStreamPost;
+import org.skywalking.apm.collector.worker.httpserver.AbstractStreamPostProvider;
import org.skywalking.apm.collector.worker.node.analysis.NodeCompAnalysis;
import org.skywalking.apm.collector.worker.node.analysis.NodeMappingDayAnalysis;
import org.skywalking.apm.collector.worker.node.analysis.NodeMappingHourAnalysis;
@@ -25,13 +24,16 @@ import org.skywalking.apm.collector.worker.segment.analysis.SegmentAnalysis;
import org.skywalking.apm.collector.worker.segment.analysis.SegmentCostAnalysis;
import org.skywalking.apm.collector.worker.segment.analysis.SegmentExceptionAnalysis;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
+import org.skywalking.apm.collector.worker.segment.entity.SegmentAndJson;
+import org.skywalking.apm.collector.worker.segment.entity.SegmentDeserialize;
import org.skywalking.apm.collector.worker.storage.AbstractTimeSlice;
import org.skywalking.apm.collector.worker.tools.DateTools;
+import org.skywalking.apm.util.StringUtil;
/**
* @author pengys5
*/
-public class SegmentPost extends AbstractPost {
+public class SegmentPost extends AbstractStreamPost {
private static final Logger logger = LogManager.getFormatterLogger(SegmentPost.class);
public SegmentPost(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
@@ -57,37 +59,71 @@ public class SegmentPost extends AbstractPost {
getClusterContext().findProvider(NodeMappingMinuteAnalysis.Role.INSTANCE).create(this);
}
- @Override
- protected void onReceive(Object message) throws Exception {
- if (message instanceof SegmentAndJson) {
- SegmentAndJson segmentAndJson = (SegmentAndJson) message;
- Segment segment = segmentAndJson.getSegment();
- try {
- validateData(segment);
- } catch (IllegalArgumentException e) {
+ /**
+ * Read segment's buffer from buffer reader by stream mode. when finish read one segment then send to analysis.
+ * This method in there, so post servlet just can receive segments data.
+ *
+ * @param bufferedReader
+ * @param response
+ * @throws Exception
+ */
+ @Override protected void onReceive(BufferedReader bufferedReader, JsonObject response) throws Exception {
+ Segment segment;
+ do {
+ int character;
+ StringBuilder builder = new StringBuilder();
+ while ((character = bufferedReader.read()) != ' ') {
+ if (character == -1) {
+ return;
+ }
+ builder.append((char)character);
+ }
+
+ int length = Integer.valueOf(builder.toString());
+ builder = new StringBuilder();
+
+ char[] buffer = new char[length];
+ int readLength = bufferedReader.read(buffer, 0, length);
+ if (readLength != length) {
+ logger.error("The actual data length was different from the length in data head! ");
return;
}
+ builder.append(buffer);
- logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", segment.getTraceSegmentId());
+ String segmentJsonStr = builder.toString();
+ segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentJsonStr);
+ tellWorkers(new SegmentAndJson(segment, segmentJsonStr));
+ }
+ while (segment != null);
+ }
+
+ private void tellWorkers(SegmentAndJson segmentAndJson) throws Exception {
+ Segment segment = segmentAndJson.getSegment();
+ try {
+ validateData(segment);
+ } catch (IllegalArgumentException e) {
+ return;
+ }
- long minuteSlice = DateTools.getMinuteSlice(segment.getStartTime());
- long hourSlice = DateTools.getHourSlice(segment.getStartTime());
- long daySlice = DateTools.getDaySlice(segment.getStartTime());
- int second = DateTools.getSecond(segment.getStartTime());
- logger.debug("minuteSlice: %s, hourSlice: %s, daySlice: %s, second:%s", minuteSlice, hourSlice, daySlice, second);
+ logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", segment.getTraceSegmentId());
- SegmentWithTimeSlice segmentWithTimeSlice = new SegmentWithTimeSlice(segment, minuteSlice, hourSlice, daySlice, second);
- getSelfContext().lookup(SegmentAnalysis.Role.INSTANCE).tell(segmentAndJson);
+ long minuteSlice = DateTools.getMinuteSlice(segment.getStartTime());
+ long hourSlice = DateTools.getHourSlice(segment.getStartTime());
+ long daySlice = DateTools.getDaySlice(segment.getStartTime());
+ int second = DateTools.getSecond(segment.getStartTime());
+ logger.debug("minuteSlice: %s, hourSlice: %s, daySlice: %s, second:%s", minuteSlice, hourSlice, daySlice, second);
- getSelfContext().lookup(SegmentCostAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
- getSelfContext().lookup(GlobalTraceAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
- getSelfContext().lookup(SegmentExceptionAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
+ SegmentWithTimeSlice segmentWithTimeSlice = new SegmentWithTimeSlice(segment, minuteSlice, hourSlice, daySlice, second);
+ getSelfContext().lookup(SegmentAnalysis.Role.INSTANCE).tell(segmentAndJson);
- getSelfContext().lookup(NodeCompAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
+ getSelfContext().lookup(SegmentCostAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
+ getSelfContext().lookup(GlobalTraceAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
+ getSelfContext().lookup(SegmentExceptionAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
- tellNodeRef(segmentWithTimeSlice);
- tellNodeMapping(segmentWithTimeSlice);
- }
+ getSelfContext().lookup(NodeCompAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
+
+ tellNodeRef(segmentWithTimeSlice);
+ tellNodeMapping(segmentWithTimeSlice);
}
private void tellNodeRef(SegmentWithTimeSlice segmentWithTimeSlice) throws Exception {
@@ -111,17 +147,12 @@ public class SegmentPost extends AbstractPost {
}
}
- public static class Factory extends AbstractPostProvider {
+ public static class Factory extends AbstractStreamPostProvider {
@Override
public String servletPath() {
return "/segments";
}
- @Override
- public int queueSize() {
- return WorkerConfig.Queue.Segment.SegmentPost.SIZE;
- }
-
@Override
public Role role() {
return WorkerRole.INSTANCE;
diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/segment/SegmentTopGetWithGlobalTraceId.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/segment/SegmentTopGetWithGlobalTraceId.java
index e7823e3040..8343be2f46 100644
--- a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/segment/SegmentTopGetWithGlobalTraceId.java
+++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/segment/SegmentTopGetWithGlobalTraceId.java
@@ -1,6 +1,8 @@
package org.skywalking.apm.collector.worker.segment;
import com.google.gson.JsonObject;
+import java.util.Arrays;
+import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
@@ -14,9 +16,6 @@ import org.skywalking.apm.collector.worker.httpserver.AbstractGetProvider;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentTopSearchWithGlobalTraceId;
import org.skywalking.apm.collector.worker.tools.ParameterTools;
-import java.util.Arrays;
-import java.util.Map;
-
/**
* @author pengys5
*/
@@ -33,29 +32,28 @@ public class SegmentTopGetWithGlobalTraceId extends AbstractGet {
getClusterContext().findProvider(SegmentTopSearchWithGlobalTraceId.WorkerRole.INSTANCE).create(this);
}
- @Override
- protected void onSearch(Map request, JsonObject response) throws Exception {
- if (!request.containsKey("globalTraceId") || !request.containsKey("from") || !request.containsKey("limit")) {
+ @Override protected void onReceive(Map parameter, JsonObject response) throws Exception {
+ if (!parameter.containsKey("globalTraceId") || !parameter.containsKey("from") || !parameter.containsKey("limit")) {
throw new IllegalArgumentException("the request parameter must contains globalTraceId, from, limit");
}
- logger.debug("globalTraceId: %s, from: %s, limit: %s", Arrays.toString(request.get("globalTraceId")),
- Arrays.toString(request.get("from")), Arrays.toString(request.get("limit")));
+ logger.debug("globalTraceId: %s, from: %s, limit: %s", Arrays.toString(parameter.get("globalTraceId")),
+ Arrays.toString(parameter.get("from")), Arrays.toString(parameter.get("limit")));
int from;
try {
- from = Integer.valueOf(ParameterTools.INSTANCE.toString(request, "from"));
+ from = Integer.valueOf(ParameterTools.INSTANCE.toString(parameter, "from"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter from must numeric with int type");
}
int limit;
try {
- limit = Integer.valueOf(ParameterTools.INSTANCE.toString(request, "limit"));
+ limit = Integer.valueOf(ParameterTools.INSTANCE.toString(parameter, "limit"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter from must numeric with int type");
}
- String globalTraceId = ParameterTools.INSTANCE.toString(request, "globalTraceId");
+ String globalTraceId = ParameterTools.INSTANCE.toString(parameter, "globalTraceId");
SegmentTopSearchWithGlobalTraceId.RequestEntity requestEntity = new SegmentTopSearchWithGlobalTraceId.RequestEntity(globalTraceId, from, limit);
getSelfContext().lookup(SegmentTopSearchWithGlobalTraceId.WorkerRole.INSTANCE).ask(requestEntity, response);
diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/segment/SegmentTopGetWithTimeSlice.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/segment/SegmentTopGetWithTimeSlice.java
index 74dfbfb299..6da7e9dde6 100644
--- a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/segment/SegmentTopGetWithTimeSlice.java
+++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/segment/SegmentTopGetWithTimeSlice.java
@@ -1,6 +1,8 @@
package org.skywalking.apm.collector.worker.segment;
import com.google.gson.JsonObject;
+import java.util.Arrays;
+import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
@@ -14,9 +16,6 @@ import org.skywalking.apm.collector.worker.httpserver.AbstractGetProvider;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentTopSearchWithTimeSlice;
import org.skywalking.apm.collector.worker.tools.ParameterTools;
-import java.util.Arrays;
-import java.util.Map;
-
/**
* @author pengys5
*/
@@ -33,49 +32,48 @@ public class SegmentTopGetWithTimeSlice extends AbstractGet {
getClusterContext().findProvider(SegmentTopSearchWithTimeSlice.WorkerRole.INSTANCE).create(this);
}
- @Override
- protected void onSearch(Map request, JsonObject response) throws Exception {
- if (!request.containsKey("startTime") || !request.containsKey("endTime") || !request.containsKey("from") || !request.containsKey("limit")) {
+ @Override protected void onReceive(Map parameter, JsonObject response) throws Exception {
+ if (!parameter.containsKey("startTime") || !parameter.containsKey("endTime") || !parameter.containsKey("from") || !parameter.containsKey("limit")) {
throw new IllegalArgumentException("the request parameter must contains startTime, endTime, from, limit");
}
- logger.debug("startTime: %s, endTime: %s, from: %s", Arrays.toString(request.get("startTime")),
- Arrays.toString(request.get("endTime")), Arrays.toString(request.get("from")));
+ logger.debug("startTime: %s, endTime: %s, from: %s", Arrays.toString(parameter.get("startTime")),
+ Arrays.toString(parameter.get("endTime")), Arrays.toString(parameter.get("from")));
long startTime;
try {
- startTime = Long.valueOf(ParameterTools.INSTANCE.toString(request, "startTime"));
+ startTime = Long.valueOf(ParameterTools.INSTANCE.toString(parameter, "startTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter startTime must numeric with long type");
}
long endTime;
try {
- endTime = Long.valueOf(ParameterTools.INSTANCE.toString(request, "endTime"));
+ endTime = Long.valueOf(ParameterTools.INSTANCE.toString(parameter, "endTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter endTime must numeric with long type");
}
int from;
try {
- from = Integer.valueOf(ParameterTools.INSTANCE.toString(request, "from"));
+ from = Integer.valueOf(ParameterTools.INSTANCE.toString(parameter, "from"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter from must numeric with int type");
}
int limit;
try {
- limit = Integer.valueOf(ParameterTools.INSTANCE.toString(request, "limit"));
+ limit = Integer.valueOf(ParameterTools.INSTANCE.toString(parameter, "limit"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter from must numeric with int type");
}
int minCost = -1;
- if (request.containsKey("minCost")) {
- minCost = Integer.valueOf(ParameterTools.INSTANCE.toString(request, "minCost"));
+ if (parameter.containsKey("minCost")) {
+ minCost = Integer.valueOf(ParameterTools.INSTANCE.toString(parameter, "minCost"));
}
int maxCost = -1;
- if (request.containsKey("maxCost")) {
- maxCost = Integer.valueOf(ParameterTools.INSTANCE.toString(request, "maxCost"));
+ if (parameter.containsKey("maxCost")) {
+ maxCost = Integer.valueOf(ParameterTools.INSTANCE.toString(parameter, "maxCost"));
}
SegmentTopSearchWithTimeSlice.RequestEntity requestEntity;
diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/span/SpanGetWithId.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/span/SpanGetWithId.java
index 3f5224675e..8110637715 100644
--- a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/span/SpanGetWithId.java
+++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/span/SpanGetWithId.java
@@ -1,6 +1,8 @@
package org.skywalking.apm.collector.worker.span;
import com.google.gson.JsonObject;
+import java.util.Arrays;
+import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
@@ -14,9 +16,6 @@ import org.skywalking.apm.collector.worker.httpserver.AbstractGetProvider;
import org.skywalking.apm.collector.worker.span.persistence.SpanSearchWithId;
import org.skywalking.apm.collector.worker.tools.ParameterTools;
-import java.util.Arrays;
-import java.util.Map;
-
/**
* @author pengys5
*/
@@ -33,15 +32,14 @@ public class SpanGetWithId extends AbstractGet {
getClusterContext().findProvider(SpanSearchWithId.WorkerRole.INSTANCE).create(this);
}
- @Override
- protected void onSearch(Map request, JsonObject response) throws Exception {
- if (!request.containsKey("segId") || !request.containsKey("spanId")) {
+ @Override protected void onReceive(Map parameter, JsonObject response) throws Exception {
+ if (!parameter.containsKey("segId") || !parameter.containsKey("spanId")) {
throw new IllegalArgumentException("the request parameter must contains segId, spanId");
}
- logger.debug("segId: %s, spanId: %s", Arrays.toString(request.get("segId")), Arrays.toString(request.get("spanId")));
+ logger.debug("segId: %s, spanId: %s", Arrays.toString(parameter.get("segId")), Arrays.toString(parameter.get("spanId")));
- String segId = ParameterTools.INSTANCE.toString(request, "segId");
- String spanId = ParameterTools.INSTANCE.toString(request, "spanId");
+ String segId = ParameterTools.INSTANCE.toString(parameter, "segId");
+ String spanId = ParameterTools.INSTANCE.toString(parameter, "spanId");
SpanSearchWithId.RequestEntity requestEntity = new SpanSearchWithId.RequestEntity(segId, spanId);
getSelfContext().lookup(SpanSearchWithId.WorkerRole.INSTANCE).ask(requestEntity, response);
diff --git a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/tracedag/TraceDagGetWithTimeSlice.java b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/tracedag/TraceDagGetWithTimeSlice.java
index 5c94ffec2b..b882d88664 100644
--- a/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/tracedag/TraceDagGetWithTimeSlice.java
+++ b/apm-collector/apm-collector-worker/src/main/java/org/skywalking/apm/collector/worker/tracedag/TraceDagGetWithTimeSlice.java
@@ -1,6 +1,8 @@
package org.skywalking.apm.collector.worker.tracedag;
import com.google.gson.JsonObject;
+import java.util.Arrays;
+import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
@@ -18,9 +20,6 @@ import org.skywalking.apm.collector.worker.noderef.persistence.NodeRefResSumSear
import org.skywalking.apm.collector.worker.noderef.persistence.NodeRefSearchWithTimeSlice;
import org.skywalking.apm.collector.worker.tools.ParameterTools;
-import java.util.Arrays;
-import java.util.Map;
-
/**
* @author pengys5
*/
@@ -40,29 +39,28 @@ public class TraceDagGetWithTimeSlice extends AbstractGet {
getClusterContext().findProvider(NodeRefResSumSearchWithTimeSlice.WorkerRole.INSTANCE).create(this);
}
- @Override
- protected void onSearch(Map request, JsonObject response) throws Exception {
- if (!request.containsKey("startTime") || !request.containsKey("endTime") || !request.containsKey("timeSliceType")) {
+ @Override protected void onReceive(Map parameter, JsonObject response) throws Exception {
+ if (!parameter.containsKey("startTime") || !parameter.containsKey("endTime") || !parameter.containsKey("timeSliceType")) {
throw new IllegalArgumentException("the request parameter must contains startTime,endTime,timeSliceType");
}
- logger.debug("startTime: %s, endTime: %s, timeSliceType: %s", Arrays.toString(request.get("startTime")),
- Arrays.toString(request.get("endTime")), Arrays.toString(request.get("timeSliceType")));
+ logger.debug("startTime: %s, endTime: %s, timeSliceType: %s", Arrays.toString(parameter.get("startTime")),
+ Arrays.toString(parameter.get("endTime")), Arrays.toString(parameter.get("timeSliceType")));
long startTime;
try {
- startTime = Long.valueOf(ParameterTools.INSTANCE.toString(request, "startTime"));
+ startTime = Long.valueOf(ParameterTools.INSTANCE.toString(parameter, "startTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter startTime must numeric with long type");
}
long endTime;
try {
- endTime = Long.valueOf(ParameterTools.INSTANCE.toString(request, "endTime"));
+ endTime = Long.valueOf(ParameterTools.INSTANCE.toString(parameter, "endTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter endTime must numeric with long type");
}
- String timeSliceType = ParameterTools.INSTANCE.toString(request, "timeSliceType");
+ String timeSliceType = ParameterTools.INSTANCE.toString(parameter, "timeSliceType");
JsonObject compResponse = getNewResponse();
getSelfContext().lookup(NodeCompLoad.WorkerRole.INSTANCE).ask(null, compResponse);
diff --git a/apm-collector/apm-collector-worker/src/main/resources/META-INF/services/org.skywalking.apm.collector.worker.httpserver.AbstractPostProvider b/apm-collector/apm-collector-worker/src/main/resources/META-INF/services/org.skywalking.apm.collector.worker.httpserver.AbstractPostProvider
index 6d8279b06c..e69de29bb2 100644
--- a/apm-collector/apm-collector-worker/src/main/resources/META-INF/services/org.skywalking.apm.collector.worker.httpserver.AbstractPostProvider
+++ b/apm-collector/apm-collector-worker/src/main/resources/META-INF/services/org.skywalking.apm.collector.worker.httpserver.AbstractPostProvider
@@ -1 +0,0 @@
-org.skywalking.apm.collector.worker.segment.SegmentPost$Factory
\ No newline at end of file
diff --git a/apm-collector/apm-collector-worker/src/main/resources/META-INF/services/org.skywalking.apm.collector.worker.httpserver.AbstractStreamPostProvider b/apm-collector/apm-collector-worker/src/main/resources/META-INF/services/org.skywalking.apm.collector.worker.httpserver.AbstractStreamPostProvider
new file mode 100644
index 0000000000..6d8279b06c
--- /dev/null
+++ b/apm-collector/apm-collector-worker/src/main/resources/META-INF/services/org.skywalking.apm.collector.worker.httpserver.AbstractStreamPostProvider
@@ -0,0 +1 @@
+org.skywalking.apm.collector.worker.segment.SegmentPost$Factory
\ No newline at end of file
diff --git a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/globaltrace/GlobalTraceGetWithGlobalIdTestCase.java b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/globaltrace/GlobalTraceGetWithGlobalIdTestCase.java
index e49b9c2bc5..9b1e7e4efc 100644
--- a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/globaltrace/GlobalTraceGetWithGlobalIdTestCase.java
+++ b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/globaltrace/GlobalTraceGetWithGlobalIdTestCase.java
@@ -81,14 +81,14 @@ public class GlobalTraceGetWithGlobalIdTestCase {
request.put("globalId", globalId);
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
public void testOnSearchError() throws Exception {
Map request = new HashMap<>();
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
class GlobalTraceAnswerGet implements Answer {
diff --git a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/AbstractGetTestCase.java b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/AbstractGetTestCase.java
index a5e390ac0c..73f5f3fde7 100644
--- a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/AbstractGetTestCase.java
+++ b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/AbstractGetTestCase.java
@@ -1,15 +1,24 @@
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonObject;
+import java.io.IOException;
+import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Map;
+import javax.servlet.http.HttpServletResponse;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* @author pengys5
@@ -18,23 +27,37 @@ public class AbstractGetTestCase {
private TestAbstractGet get = spy(new TestAbstractGet(TestAbstractGet.WorkerRole.INSTANCE, null, null));
+ private HttpServletResponse response;
+ private PrintWriter writer;
+
+ @Before
+ public void init() throws IOException {
+ writer = mock(PrintWriter.class);
+ response = mock(HttpServletResponse.class);
+ when(response.getWriter()).thenReturn(writer);
+ }
+
@Test
public void testOnWork() throws Exception {
Map parameterMap = new HashMap<>();
- JsonObject response = new JsonObject();
get.onWork(parameterMap, response);
- verify(get).onSearch(any(Map.class), any(JsonObject.class));
+ verify(get).onReceive(any(Map.class), any(JsonObject.class));
}
@Test
public void testOnWorkError() throws Exception {
Map parameterMap = new HashMap<>();
- JsonObject response = new JsonObject();
- doThrow(new Exception("testOnWorkError")).when(get).onSearch(any(Map.class), any(JsonObject.class));
- get.onWork(parameterMap, response);
+ doAnswer(new Answer() {
+ @Override public Object answer(InvocationOnMock invocation) throws Throwable {
+ JsonObject response = (JsonObject)invocation.getArguments()[0];
+ Assert.assertEquals(false, response.get("isSuccess").getAsBoolean());
+ Assert.assertEquals("testOnWorkError", response.get("reason").getAsString());
+ return null;
+ }
+ }).when(writer).print(any(JsonObject.class));
- Assert.assertEquals(false, response.get("isSuccess").getAsBoolean());
- Assert.assertEquals("testOnWorkError", response.get("reason").getAsString());
+ doThrow(new Exception("testOnWorkError")).when(get).onReceive(any(Map.class), any(JsonObject.class));
+ get.onWork(parameterMap, response);
}
}
diff --git a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/AbstractPostTestCase.java b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/AbstractPostTestCase.java
index c4306509d2..43f7771b61 100644
--- a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/AbstractPostTestCase.java
+++ b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/AbstractPostTestCase.java
@@ -2,10 +2,12 @@ package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonObject;
import java.io.BufferedReader;
-import java.io.FileReader;
+import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringReader;
import java.io.Writer;
+import java.util.HashMap;
+import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.junit.Before;
@@ -19,7 +21,6 @@ import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.worker.segment.mock.SegmentMock;
-import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -35,9 +36,15 @@ import static org.mockito.Mockito.when;
public class AbstractPostTestCase {
private TestAbstractPost post;
+ private HttpServletResponse response;
+ private PrintWriter writer;
@Before
- public void init() {
+ public void init() throws IOException {
+ writer = mock(PrintWriter.class);
+ response = mock(HttpServletResponse.class);
+ when(response.getWriter()).thenReturn(writer);
+
ClusterWorkerContext clusterWorkerContext = PowerMockito.mock(ClusterWorkerContext.class);
LocalWorkerContext localWorkerContext = PowerMockito.mock(LocalWorkerContext.class);
post = spy(new TestAbstractPost(TestAbstractPost.WorkerRole.INSTANCE, clusterWorkerContext, localWorkerContext));
@@ -45,14 +52,17 @@ public class AbstractPostTestCase {
@Test
public void testOnWork() throws Exception {
- String request = "testOnWork";
- post.onWork(request);
- verify(post).onReceive(anyString());
+ Map parameter = new HashMap<>();
+ parameter.put("Test", new String[] {
+ "test"
+ });
+ post.onWork(parameter, response);
+ verify(post).onReceive(any(Map.class), any(JsonObject.class));
}
@Test
public void testOnWorkError() throws Exception {
- post.onWork(new JsonObject());
+ post.onWork(new HashMap(), response);
PowerMockito.verifyPrivate(post).invoke("saveException", any(IllegalArgumentException.class));
}
diff --git a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/PostWithHttpServletTestCase.java b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/PostWithHttpServletTestCase.java
index 35c595bad4..5e26aadc10 100644
--- a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/PostWithHttpServletTestCase.java
+++ b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/PostWithHttpServletTestCase.java
@@ -1,28 +1,32 @@
package org.skywalking.apm.collector.worker.httpserver;
+import java.io.BufferedReader;
+import java.io.PrintWriter;
+import java.io.StringReader;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.skywalking.apm.collector.actor.LocalAsyncWorkerRef;
+import org.skywalking.apm.collector.actor.LocalSyncWorkerRef;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.BufferedReader;
-import java.io.PrintWriter;
-import java.io.StringReader;
-
import static org.mockito.Matchers.anyInt;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* @author pengys5
*/
public class PostWithHttpServletTestCase {
- private LocalAsyncWorkerRef workerRef;
+ private LocalSyncWorkerRef workerRef;
private AbstractPost.PostWithHttpServlet servlet;
private HttpServletRequest request;
private HttpServletResponse response;
@@ -30,7 +34,7 @@ public class PostWithHttpServletTestCase {
@Before
public void init() throws Exception {
- workerRef = mock(LocalAsyncWorkerRef.class);
+ workerRef = mock(LocalSyncWorkerRef.class);
servlet = new AbstractPost.PostWithHttpServlet(workerRef);
request = mock(HttpServletRequest.class);
@@ -46,7 +50,7 @@ public class PostWithHttpServletTestCase {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
- Integer status = (Integer) invocation.getArguments()[0];
+ Integer status = (Integer)invocation.getArguments()[0];
Assert.assertEquals(new Integer(200), status);
return null;
}
@@ -55,7 +59,7 @@ public class PostWithHttpServletTestCase {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
- Segment segment = (Segment) invocation.getArguments()[0];
+ Segment segment = (Segment)invocation.getArguments()[0];
Assert.assertEquals("TestTest2", segment.getTraceSegmentId());
return null;
}
@@ -73,7 +77,7 @@ public class PostWithHttpServletTestCase {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
- Integer status = (Integer) invocation.getArguments()[0];
+ Integer status = (Integer)invocation.getArguments()[0];
Assert.assertEquals(new Integer(500), status);
return null;
}
diff --git a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/TestAbstractGet.java b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/TestAbstractGet.java
index 333f608652..58cca2c7fa 100644
--- a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/TestAbstractGet.java
+++ b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/TestAbstractGet.java
@@ -1,6 +1,8 @@
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonObject;
+import java.util.Map;
+import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
@@ -8,8 +10,6 @@ import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
-import java.util.Map;
-
/**
* @author pengys5
*/
@@ -23,8 +23,7 @@ public class TestAbstractGet extends AbstractGet {
super.preStart();
}
- @Override
- protected void onSearch(Map request, JsonObject response) throws Exception {
+ @Override protected void onReceive(Map parameter, JsonObject response) throws Exception {
}
diff --git a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/TestAbstractPost.java b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/TestAbstractPost.java
index e65c0ef5f2..802f365fd4 100644
--- a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/TestAbstractPost.java
+++ b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/TestAbstractPost.java
@@ -1,5 +1,7 @@
package org.skywalking.apm.collector.worker.httpserver;
+import com.google.gson.JsonObject;
+import java.util.Map;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
@@ -20,8 +22,8 @@ public class TestAbstractPost extends AbstractPost {
super.preStart();
}
- @Override
- protected void onReceive(Object message) throws Exception {
+ @Override protected void onReceive(Map parameter, JsonObject response) throws Exception {
+
}
public enum WorkerRole implements Role {
@@ -44,11 +46,6 @@ public class TestAbstractPost extends AbstractPost {
return "/TestAbstractPost";
}
- @Override
- public int queueSize() {
- return 4;
- }
-
@Override
public Role role() {
return TestAbstractPost.WorkerRole.INSTANCE;
diff --git a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/TestAbstractPostProvider.java b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/TestAbstractPostProvider.java
index f90ce8afa2..a2803632ea 100644
--- a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/TestAbstractPostProvider.java
+++ b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/httpserver/TestAbstractPostProvider.java
@@ -8,11 +8,6 @@ import org.skywalking.apm.collector.actor.Role;
* @author pengys5
*/
public class TestAbstractPostProvider extends AbstractPostProvider {
- @Override
- public int queueSize() {
- return 4;
- }
-
@Override
public String servletPath() {
return "testPost";
diff --git a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/noderef/NodeRefResSumGetGroupWithTimeSliceTestCase.java b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/noderef/NodeRefResSumGetGroupWithTimeSliceTestCase.java
index 60e946a237..5101feec38 100644
--- a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/noderef/NodeRefResSumGetGroupWithTimeSliceTestCase.java
+++ b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/noderef/NodeRefResSumGetGroupWithTimeSliceTestCase.java
@@ -1,6 +1,9 @@
package org.skywalking.apm.collector.worker.noderef;
import com.google.gson.JsonObject;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -20,18 +23,17 @@ import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.worker.noderef.persistence.NodeRefResSumGroupWithTimeSlice;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TimeZone;
-
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest( {ClusterWorkerContext.class})
-@PowerMockIgnore( {"javax.management.*"})
+@PrepareForTest({ClusterWorkerContext.class})
+@PowerMockIgnore({"javax.management.*"})
public class NodeRefResSumGetGroupWithTimeSliceTestCase {
private NodeRefResSumGetGroupWithTimeSlice getObj;
@@ -79,7 +81,7 @@ public class NodeRefResSumGetGroupWithTimeSliceTestCase {
}
@Test
- public void testOnSearch() throws Exception {
+ public void testOnReceive() throws Exception {
Map request = new HashMap<>();
String[] startTime = {"100"};
request.put("startTime", startTime);
@@ -89,18 +91,18 @@ public class NodeRefResSumGetGroupWithTimeSliceTestCase {
request.put("timeSliceType", timeSliceType);
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
- public void testOnSearchError() throws Exception {
+ public void testOnReceiveError() throws Exception {
Map request = new HashMap<>();
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
- public void testOnSearchStartTimeError() throws Exception {
+ public void testOnReceiveStartTimeError() throws Exception {
Map request = new HashMap<>();
String[] startTime = {"x"};
request.put("startTime", startTime);
@@ -109,11 +111,12 @@ public class NodeRefResSumGetGroupWithTimeSliceTestCase {
String[] timeSliceType = {"minute"};
request.put("timeSliceType", timeSliceType);
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+
+ getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
- public void testOnSearchEndTimeError() throws Exception {
+ public void testOnReceiveEndTimeError() throws Exception {
Map request = new HashMap<>();
String[] startTime = {"100"};
request.put("startTime", startTime);
@@ -122,14 +125,14 @@ public class NodeRefResSumGetGroupWithTimeSliceTestCase {
String[] timeSliceType = {"minute"};
request.put("timeSliceType", timeSliceType);
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
class NodeRefResSumGetAnswerGet implements Answer {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
- NodeRefResSumGroupWithTimeSlice.RequestEntity requestEntity = (NodeRefResSumGroupWithTimeSlice.RequestEntity) invocation.getArguments()[0];
+ NodeRefResSumGroupWithTimeSlice.RequestEntity requestEntity = (NodeRefResSumGroupWithTimeSlice.RequestEntity)invocation.getArguments()[0];
Assert.assertEquals(100L, requestEntity.getStartTime());
Assert.assertEquals(200L, requestEntity.getEndTime());
Assert.assertEquals("minute", requestEntity.getSliceType());
diff --git a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/SegmentPostTestCase.java b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/SegmentPostTestCase.java
index 196311d79d..644f4b9764 100644
--- a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/SegmentPostTestCase.java
+++ b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/SegmentPostTestCase.java
@@ -1,7 +1,9 @@
package org.skywalking.apm.collector.worker.segment;
-import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
+import java.io.BufferedReader;
+import java.io.StringReader;
+import java.util.TimeZone;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.Assert;
@@ -17,15 +19,23 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import org.skywalking.apm.collector.actor.*;
+import org.skywalking.apm.collector.actor.ClusterWorkerContext;
+import org.skywalking.apm.collector.actor.LocalWorkerContext;
+import org.skywalking.apm.collector.actor.ProviderNotFoundException;
+import org.skywalking.apm.collector.actor.Role;
+import org.skywalking.apm.collector.actor.WorkerRef;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
-import org.skywalking.apm.collector.worker.config.WorkerConfig;
import org.skywalking.apm.collector.worker.globaltrace.analysis.GlobalTraceAnalysis;
import org.skywalking.apm.collector.worker.node.analysis.NodeCompAnalysis;
import org.skywalking.apm.collector.worker.node.analysis.NodeMappingDayAnalysis;
import org.skywalking.apm.collector.worker.node.analysis.NodeMappingHourAnalysis;
import org.skywalking.apm.collector.worker.node.analysis.NodeMappingMinuteAnalysis;
-import org.skywalking.apm.collector.worker.noderef.analysis.*;
+import org.skywalking.apm.collector.worker.noderef.analysis.NodeRefDayAnalysis;
+import org.skywalking.apm.collector.worker.noderef.analysis.NodeRefHourAnalysis;
+import org.skywalking.apm.collector.worker.noderef.analysis.NodeRefMinuteAnalysis;
+import org.skywalking.apm.collector.worker.noderef.analysis.NodeRefResSumDayAnalysis;
+import org.skywalking.apm.collector.worker.noderef.analysis.NodeRefResSumHourAnalysis;
+import org.skywalking.apm.collector.worker.noderef.analysis.NodeRefResSumMinuteAnalysis;
import org.skywalking.apm.collector.worker.segment.analysis.SegmentAnalysis;
import org.skywalking.apm.collector.worker.segment.analysis.SegmentCostAnalysis;
import org.skywalking.apm.collector.worker.segment.analysis.SegmentExceptionAnalysis;
@@ -35,9 +45,8 @@ import org.skywalking.apm.collector.worker.segment.persistence.SegmentExceptionS
import org.skywalking.apm.collector.worker.segment.persistence.SegmentSave;
import org.skywalking.apm.collector.worker.tools.DateTools;
-import java.util.TimeZone;
-
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.doAnswer;
import static org.powermock.api.mockito.PowerMockito.doReturn;
@@ -47,8 +56,8 @@ import static org.powermock.api.mockito.PowerMockito.mock;
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest( {LocalWorkerContext.class, WorkerRef.class})
-@PowerMockIgnore( {"javax.management.*"})
+@PrepareForTest({LocalWorkerContext.class, WorkerRef.class})
+@PowerMockIgnore({"javax.management.*"})
public class SegmentPostTestCase {
private Logger logger = LogManager.getFormatterLogger(SegmentPostTestCase.class);
@@ -90,10 +99,6 @@ public class SegmentPostTestCase {
Assert.assertEquals(SegmentPost.class.getSimpleName(), factory.role().roleName());
Assert.assertEquals(SegmentPost.class.getSimpleName(), factory.workerInstance(null).getClass().getSimpleName());
Assert.assertEquals("/segments", factory.servletPath());
-
- int testSize = 10;
- WorkerConfig.Queue.Segment.SegmentPost.SIZE = testSize;
- Assert.assertEquals(testSize, factory.queueSize());
}
@Test
@@ -166,12 +171,14 @@ public class SegmentPostTestCase {
@Test
public void testValidateData() throws Exception {
- JsonArray segmentArray = new JsonArray();
JsonObject segmentJsonObj = new JsonObject();
segmentJsonObj.addProperty("et", 1491277162066L);
- segmentArray.add(segmentJsonObj);
+ String jsonStr = segmentJsonObj.toString();
+
+ JsonObject response = new JsonObject();
- segmentPost.onReceive(segmentArray.toString());
+ BufferedReader reader = new BufferedReader(new StringReader(jsonStr.length() + " " + jsonStr));
+ segmentPost.onReceive(reader, response);
}
private SegmentSaveAnswer segmentSaveAnswer_1;
@@ -297,7 +304,7 @@ public class SegmentPostTestCase {
public void testOnReceive() throws Exception {
String cacheServiceSegmentAsString = segmentMock.mockCacheServiceSegmentAsString();
- segmentPost.onReceive(cacheServiceSegmentAsString);
+ segmentPost.onReceive(new BufferedReader(new StringReader(cacheServiceSegmentAsString)), new JsonObject());
Assert.assertEquals(DateTools.changeToUTCSlice(201703310915L), segmentSaveAnswer_1.minute);
Assert.assertEquals(DateTools.changeToUTCSlice(201703310900L), segmentSaveAnswer_1.hour);
@@ -338,7 +345,7 @@ public class SegmentPostTestCase {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
- segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) invocation.getArguments()[0];
+ segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)invocation.getArguments()[0];
return null;
}
}
@@ -350,7 +357,7 @@ public class SegmentPostTestCase {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
- JsonObject jsonObject = (JsonObject) invocation.getArguments()[0];
+ JsonObject jsonObject = (JsonObject)invocation.getArguments()[0];
logger.info("SegmentSave json: " + jsonObject.toString());
minute = jsonObject.get("minute").getAsLong();
hour = jsonObject.get("hour").getAsLong();
@@ -363,7 +370,7 @@ public class SegmentPostTestCase {
private static final String SegId = "Segment.1490922929258.927784221.5991.27.1";
public boolean matches(Object para) {
- JsonObject paraJson = (JsonObject) para;
+ JsonObject paraJson = (JsonObject)para;
return SegId.equals(paraJson.get("ts").getAsString());
}
}
@@ -372,7 +379,7 @@ public class SegmentPostTestCase {
private static final String SegId = "Segment.1490922929298.927784221.5991.28.1";
public boolean matches(Object para) {
- JsonObject paraJson = (JsonObject) para;
+ JsonObject paraJson = (JsonObject)para;
return SegId.equals(paraJson.get("ts").getAsString());
}
}
diff --git a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/SegmentRealPost.java b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/SegmentRealPost.java
index b0028527a9..0abb2727b5 100644
--- a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/SegmentRealPost.java
+++ b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/SegmentRealPost.java
@@ -16,6 +16,7 @@ public class SegmentRealPost {
// HttpClientTools.INSTANCE.post("http://localhost:7001/segments", portalServiceExceptionSegmentAsString);
String cacheServiceSegmentAsString = mock.mockCacheServiceSegmentAsString();
+ System.out.println(cacheServiceSegmentAsString);
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", cacheServiceSegmentAsString);
String persistenceServiceSegmentAsString = mock.mockPersistenceServiceSegmentAsString();
diff --git a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/SegmentTopGetWithGlobalTraceIdTestCase.java b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/SegmentTopGetWithGlobalTraceIdTestCase.java
index 1e3b7f8d57..ca787e3c42 100644
--- a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/SegmentTopGetWithGlobalTraceIdTestCase.java
+++ b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/SegmentTopGetWithGlobalTraceIdTestCase.java
@@ -97,14 +97,14 @@ public class SegmentTopGetWithGlobalTraceIdTestCase {
request.put("limit", limit);
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
public void testOnSearchError() throws Exception {
Map request = new HashMap<>();
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@@ -118,7 +118,7 @@ public class SegmentTopGetWithGlobalTraceIdTestCase {
request.put("limit", limit);
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@@ -132,7 +132,7 @@ public class SegmentTopGetWithGlobalTraceIdTestCase {
request.put("limit", limit);
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
class SegmentTopGetAnswerGet implements Answer {
diff --git a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/SegmentTopGetWithTimeSliceTestCase.java b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/SegmentTopGetWithTimeSliceTestCase.java
index ca9771d94c..e3eb264c69 100644
--- a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/SegmentTopGetWithTimeSliceTestCase.java
+++ b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/SegmentTopGetWithTimeSliceTestCase.java
@@ -90,7 +90,7 @@ public class SegmentTopGetWithTimeSliceTestCase {
public void testOnSearch() throws Exception {
Map request = createRequest();
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@@ -100,7 +100,7 @@ public class SegmentTopGetWithTimeSliceTestCase {
request.put("startTime", startTime);
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@@ -110,7 +110,7 @@ public class SegmentTopGetWithTimeSliceTestCase {
request.put("endTime", endTime);
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@@ -120,7 +120,7 @@ public class SegmentTopGetWithTimeSliceTestCase {
request.put("from", from);
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@@ -130,7 +130,7 @@ public class SegmentTopGetWithTimeSliceTestCase {
request.put("limit", limit);
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
private Map createRequest() {
diff --git a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/mock/SegmentMock.java b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/mock/SegmentMock.java
index f14ca82ea5..8bb6b4ccd4 100644
--- a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/mock/SegmentMock.java
+++ b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/segment/mock/SegmentMock.java
@@ -32,7 +32,7 @@ public class SegmentMock {
}
public String mockCacheServiceSegmentAsString() throws FileNotFoundException {
- return JsonFileReader.INSTANCE.read(CacheServiceJsonFile);
+ return JsonFileReader.INSTANCE.readSegment(CacheServiceJsonFile);
}
public String mockCacheServiceExceptionSegmentAsString() throws FileNotFoundException {
diff --git a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/span/SpanGetWithIdTestCase.java b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/span/SpanGetWithIdTestCase.java
index 796efb27c5..39aac5b114 100644
--- a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/span/SpanGetWithIdTestCase.java
+++ b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/span/SpanGetWithIdTestCase.java
@@ -86,14 +86,14 @@ public class SpanGetWithIdTestCase {
String[] spanId = {"20"};
request.put("spanId", spanId);
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
public void testOnSearchError() throws Exception {
Map request = new HashMap<>();
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
class SpanGetAnswerGet implements Answer {
diff --git a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/tracedag/TraceDagGetWithTimeSliceTestCase.java b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/tracedag/TraceDagGetWithTimeSliceTestCase.java
index d53c1e0501..eeb7f3b443 100644
--- a/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/tracedag/TraceDagGetWithTimeSliceTestCase.java
+++ b/apm-collector/apm-collector-worker/src/test/java/org/skywalking/apm/collector/worker/tracedag/TraceDagGetWithTimeSliceTestCase.java
@@ -105,7 +105,7 @@ public class TraceDagGetWithTimeSliceTestCase {
public void testOnSearchError() throws Exception {
Map request = new HashMap<>();
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@@ -115,7 +115,7 @@ public class TraceDagGetWithTimeSliceTestCase {
request.put("startTime", startTime);
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@@ -125,7 +125,7 @@ public class TraceDagGetWithTimeSliceTestCase {
request.put("endTime", endTime);
JsonObject response = new JsonObject();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
private Map createRequest() {
@@ -149,7 +149,7 @@ public class TraceDagGetWithTimeSliceTestCase {
PowerMockito.when(getObj, "getNewResponse").thenReturn(response);
Map request = createRequest();
- getObj.onSearch(request, response);
+ getObj.onReceive(request, response);
}
class TraceDagGetAnswerGet_1 implements Answer {
diff --git a/apm-sniffer/apm-sdk-plugin/feign-default-http-9.x-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/feign-default-http-9.x-plugin/pom.xml
index 43503165fb..6a303687b8 100644
--- a/apm-sniffer/apm-sdk-plugin/feign-default-http-9.x-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/feign-default-http-9.x-plugin/pom.xml
@@ -21,6 +21,7 @@
io.github.openfeign
feign-core
${feign.version}
+ provided
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/httpClient-4.x-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/httpClient-4.x-plugin/pom.xml
index 98fd86a893..1a2b0a7102 100644
--- a/apm-sniffer/apm-sdk-plugin/httpClient-4.x-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/httpClient-4.x-plugin/pom.xml
@@ -26,15 +26,6 @@
4.3
provided
-
-
org.apache.logging.log4j
log4j-core
--
GitLab