提交 a470b286 编写于 作者: P pengys5

fixed the first item in #254: lookup the grpc services listening port

上级 d1591e3a
......@@ -2,14 +2,20 @@ package org.skywalking.apm.collector;
import akka.actor.ActorSystem;
import akka.actor.Props;
import java.io.IOException;
import java.util.ServiceLoader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractClusterWorkerProvider;
import org.skywalking.apm.collector.actor.AbstractLocalWorkerProvider;
import org.skywalking.apm.collector.actor.AbstractWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LookUp;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.UsedRoleNameException;
import org.skywalking.apm.collector.cluster.WorkersListener;
import org.skywalking.apm.collector.config.ConfigInitializer;
import java.io.IOException;
import java.util.ServiceLoader;
import org.skywalking.apm.collector.rpc.RPCAddressListener;
/**
* @author pengys5
......@@ -38,6 +44,7 @@ public class CollectorSystem {
private void createListener() {
clusterContext.getAkkaSystem().actorOf(Props.create(WorkersListener.class, clusterContext), WorkersListener.WORK_NAME);
clusterContext.getAkkaSystem().actorOf(Props.create(RPCAddressListener.class, clusterContext), RPCAddressListener.WORK_NAME);
}
private void createClusterWorkers() throws ProviderNotFoundException {
......
......@@ -8,6 +8,9 @@ import akka.cluster.MemberStatus;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.cluster.WorkerListenerMessage;
import org.skywalking.apm.collector.cluster.WorkersListener;
import org.skywalking.apm.collector.rpc.RPCAddress;
import org.skywalking.apm.collector.rpc.RPCAddressListener;
import org.skywalking.apm.collector.rpc.RPCAddressListenerMessage;
import org.skywalking.apm.collector.log.LogManager;
/**
......@@ -56,10 +59,12 @@ public abstract class AbstractClusterWorker extends AbstractWorker {
private Cluster cluster;
private final AbstractClusterWorker ownerWorker;
private final RPCAddress RPCAddress;
public WorkerWithAkka(AbstractClusterWorker ownerWorker) {
public WorkerWithAkka(AbstractClusterWorker ownerWorker, RPCAddress RPCAddress) {
this.ownerWorker = ownerWorker;
cluster = Cluster.get(getContext().system());
this.RPCAddress = RPCAddress;
}
@Override
......@@ -108,6 +113,11 @@ public abstract class AbstractClusterWorker extends AbstractWorker {
logger.info("member address: %s, worker path: %s", member.address().toString(), getSelf().path().toString());
getContext().actorSelection(member.address() + "/user/" + WorkersListener.WORK_NAME).tell(registerMessage, getSelf());
}
if (member.hasRole(RPCAddressListener.WORK_NAME) && RPCAddress != null) {
RPCAddressListenerMessage.ConfigMessage configMessage = new RPCAddressListenerMessage.ConfigMessage(RPCAddress);
logger.info("member address: %s, worker path: %s", member.address().toString(), getSelf().path().toString());
getContext().actorSelection(member.address() + "/user/" + RPCAddressListener.WORK_NAME).tell(configMessage, getSelf());
}
}
}
}
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.actor;
import akka.actor.ActorRef;
import akka.actor.Props;
import org.skywalking.apm.collector.rpc.RPCAddress;
/**
* The <code>AbstractClusterWorkerProvider</code> implementations represent providers,
......@@ -20,6 +21,10 @@ public abstract class AbstractClusterWorkerProvider<T extends AbstractClusterWor
*/
public abstract int workerNum();
public RPCAddress config() {
return null;
}
/**
* Create the worker instance into akka system, the akka system will control the cluster worker life cycle.
*
......@@ -35,10 +40,14 @@ public abstract class AbstractClusterWorkerProvider<T extends AbstractClusterWor
T clusterWorker = workerInstance(getClusterContext());
clusterWorker.preStart();
ActorRef actorRef = getClusterContext().getAkkaSystem().actorOf(Props.create(AbstractClusterWorker.WorkerWithAkka.class, clusterWorker), role().roleName() + "_" + num);
ActorRef actorRef = getClusterContext().getAkkaSystem().actorOf(Props.create(AbstractClusterWorker.WorkerWithAkka.class, clusterWorker, config()), role().roleName() + "_" + num);
ClusterWorkerRef workerRef = new ClusterWorkerRef(actorRef, role());
getClusterContext().put(workerRef);
if (config() != null) {
getClusterContext().getRpcContext().putAddress("Self", config());
}
return workerRef;
}
}
package org.skywalking.apm.collector.actor;
import akka.actor.ActorSystem;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.rpc.RPCAddressContext;
/**
* @author pengys5
......@@ -15,6 +15,7 @@ public class ClusterWorkerContext extends WorkerContext {
private final ActorSystem akkaSystem;
private Map<String, AbstractWorkerProvider> providers = new ConcurrentHashMap<>();
private RPCAddressContext rpcContext = new RPCAddressContext();
public ClusterWorkerContext(ActorSystem akkaSystem) {
this.akkaSystem = akkaSystem;
......@@ -43,4 +44,8 @@ public class ClusterWorkerContext extends WorkerContext {
providers.put(provider.role().roleName(), provider);
}
}
public RPCAddressContext getRpcContext() {
return rpcContext;
}
}
package org.skywalking.apm.collector.cluster;
import java.io.Serializable;
import org.skywalking.apm.collector.actor.AbstractWorker;
import org.skywalking.apm.collector.actor.Role;
import java.io.Serializable;
/**
* <code>WorkerListenerMessage</code> is a message just for the worker
* implementation of the {@link AbstractWorker}
......
......@@ -30,7 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class WorkersListener extends UntypedActor {
public static final String WORK_NAME = "WorkersListener";
private static final Logger logger = LogManager.getFormatterLogger(WorkersListener.class);
private final Logger logger = LogManager.getFormatterLogger(WorkersListener.class);
private final ClusterWorkerContext clusterContext;
private Cluster cluster = Cluster.get(getContext().system());
private Map<ActorRef, ClusterWorkerRef> relation = new ConcurrentHashMap<>();
......
package org.skywalking.apm.collector.rpc;
/**
* @author pengys5
*/
public class RPCAddress {
private final String address;
private final int port;
public RPCAddress(String address, int port) {
this.address = address;
this.port = port;
}
public String getAddress() {
return address;
}
public int getPort() {
return port;
}
}
package org.skywalking.apm.collector.rpc;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author pengys5
*/
public class RPCAddressContext {
private Map<String, RPCAddress> rpcAddresses = new ConcurrentHashMap<>();
public Collection<RPCAddress> rpcAddressCollection() {
return rpcAddresses.values();
}
public void putAddress(String ownerAddress, RPCAddress rpcAddress) {
rpcAddresses.put(ownerAddress, rpcAddress);
}
public void removeAddress(String ownerAddress) {
rpcAddresses.remove(ownerAddress);
}
}
package org.skywalking.apm.collector.rpc;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
/**
* @author pengys5
*/
public class RPCAddressListener extends UntypedActor {
private final Logger logger = LogManager.getFormatterLogger(RPCAddressListener.class);
public static final String WORK_NAME = "RPCAddressListener";
private final ClusterWorkerContext clusterContext;
private Cluster cluster = Cluster.get(getContext().system());
public RPCAddressListener(ClusterWorkerContext clusterContext) {
this.clusterContext = clusterContext;
}
@Override
public void preStart() throws Exception {
cluster.subscribe(getSelf(), ClusterEvent.UnreachableMember.class);
}
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof RPCAddressListenerMessage.ConfigMessage) {
RPCAddressListenerMessage.ConfigMessage configMessage = (RPCAddressListenerMessage.ConfigMessage)message;
ActorRef sender = getSender();
logger.info("address: %s, port: %s", configMessage.getConfig().getAddress(), configMessage.getConfig().getPort());
String ownerAddress = sender.path().address().hostPort();
clusterContext.getRpcContext().putAddress(ownerAddress, configMessage.getConfig());
} else if (message instanceof Terminated) {
Terminated terminated = (Terminated)message;
clusterContext.getRpcContext().removeAddress(terminated.getActor().path().address().hostPort());
} else if (message instanceof ClusterEvent.UnreachableMember) {
ClusterEvent.UnreachableMember unreachableMember = (ClusterEvent.UnreachableMember)message;
clusterContext.getRpcContext().removeAddress(unreachableMember.member().address().hostPort());
} else {
unhandled(message);
}
}
}
package org.skywalking.apm.collector.rpc;
import java.io.Serializable;
/**
* @author pengys5
*/
public class RPCAddressListenerMessage {
public static class ConfigMessage implements Serializable {
private final RPCAddress config;
public ConfigMessage(RPCAddress config) {
this.config = config;
}
public RPCAddress getConfig() {
return config;
}
}
}
......@@ -6,6 +6,7 @@ package org.skywalking.apm.collector.worker.config;
public class GRPCConfig {
public static class GRPC {
public static String HOSTNAME = "";
public static String PORT = "";
}
}
......@@ -13,6 +13,9 @@ public class GRPCConfigProvider implements ConfigProvider {
}
@Override public void cliArgs() {
if (!StringUtil.isEmpty(System.getProperty("grpc.HOSTNAME"))) {
GRPCConfig.GRPC.HOSTNAME = System.getProperty("grpc.HOSTNAME");
}
if (!StringUtil.isEmpty(System.getProperty("grpc.PORT"))) {
GRPCConfig.GRPC.PORT = System.getProperty("grpc.PORT");
}
......
......@@ -6,6 +6,12 @@ package org.skywalking.apm.collector.worker.config;
public class WorkerConfig {
public static class WorkerNum {
public static class GRPC {
public static class GRPCAddressRegister {
public static int VALUE = 1;
}
}
public static class Node {
public static class NodeCompAgg {
public static int VALUE = 2;
......
package org.skywalking.apm.collector.worker.globaltrace;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.Arrays;
import java.util.Map;
......@@ -30,13 +31,17 @@ public class GlobalTraceGetWithGlobalId extends AbstractGet {
super(role, clusterContext, selfContext);
}
@Override protected Class<? extends JsonElement> responseClass() {
return JsonObject.class;
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(GlobalTraceSearchWithGlobalId.WorkerRole.INSTANCE).create(this);
}
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
if (!parameter.containsKey("globalId")) {
throw new IllegalArgumentException("the request parameter must contains globalId");
}
......
package org.skywalking.apm.collector.worker.grpcserver;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import java.util.Map;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.httpserver.AbstractGet;
import org.skywalking.apm.collector.worker.httpserver.AbstractGetProvider;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
/**
* @author pengys5
*/
public class GRPCAddressGet extends AbstractGet {
protected GRPCAddressGet(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override protected Class<JsonArray> responseClass() {
return JsonArray.class;
}
@Override protected void onReceive(Map<String, String[]> parameter,
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
((ClusterWorkerContext)getClusterContext()).getRpcContext().rpcAddressCollection().forEach(rpcAddress -> {
((JsonArray)response).add(rpcAddress.getAddress() + ":" + rpcAddress.getPort());
});
}
public static class Factory extends AbstractGetProvider<GRPCAddressGet> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public GRPCAddressGet workerInstance(ClusterWorkerContext clusterContext) {
return new GRPCAddressGet(role(), clusterContext, new LocalWorkerContext());
}
@Override
public String servletPath() {
return "/grpc/addresses";
}
}
public enum Role implements org.skywalking.apm.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return GRPCAddressGet.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package org.skywalking.apm.collector.worker.grpcserver;
import org.skywalking.apm.collector.actor.AbstractClusterWorker;
import org.skywalking.apm.collector.actor.AbstractClusterWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.rpc.RPCAddress;
import org.skywalking.apm.collector.worker.config.GRPCConfig;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
/**
* @author pengys5
*/
public class GRPCAddressRegister extends AbstractClusterWorker {
GRPCAddressRegister(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected void onWork(Object message) throws WorkerException {
}
public static class Factory extends AbstractClusterWorkerProvider<GRPCAddressRegister> {
public static Factory INSTANCE = new Factory();
@Override
public GRPCAddressRegister.Role role() {
return Role.INSTANCE;
}
@Override public int workerNum() {
return WorkerConfig.WorkerNum.GRPC.GRPCAddressRegister.VALUE;
}
@Override public RPCAddress config() {
return new RPCAddress(GRPCConfig.GRPC.HOSTNAME, Integer.valueOf(GRPCConfig.GRPC.PORT));
}
@Override public GRPCAddressRegister workerInstance(ClusterWorkerContext clusterContext) {
return new GRPCAddressRegister(role(), clusterContext, new LocalWorkerContext());
}
}
public enum Role implements org.skywalking.apm.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return GRPCAddressRegister.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonObject;
import com.google.gson.JsonElement;
import java.io.IOException;
import java.util.Map;
import javax.servlet.ServletException;
......@@ -20,7 +20,7 @@ import org.skywalking.apm.collector.actor.WorkerNotFoundException;
* The <code>AbstractGet</code> implementations represent workers, which called by the server to allow a servlet to
* handle a GET request.
*
* <p>verride the {@link #onReceive(Map, JsonObject)} method to support a search service.
* <p>verride the {@link #onReceive(Map, JsonElement)} method to support a search service.
*
* @author pengys5
* @since v3.0-2017
......
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonObject;
import com.google.gson.JsonElement;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Map;
......@@ -31,6 +31,8 @@ public abstract class AbstractServlet extends AbstractLocalSyncWorker {
super(role, clusterContext, selfContext);
}
protected abstract Class<? extends JsonElement> responseClass();
/**
* Override this method to implementing business logic.
*
......@@ -42,12 +44,16 @@ public abstract class AbstractServlet extends AbstractLocalSyncWorker {
*/
@Override protected void onWork(Object parameter,
Object response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonObject resJson = new JsonObject();
try {
JsonElement resJson = responseClass().newInstance();
onReceive((Map<String, String[]>)parameter, resJson);
onSuccessResponse((HttpServletResponse)response, resJson);
} catch (IOException e) {
logger.error(e.getMessage(), e);
} catch (InstantiationException e) {
logger.error(e.getMessage(), e);
} catch (IllegalAccessException e) {
logger.error(e.getMessage(), e);
}
}
......@@ -55,23 +61,22 @@ public abstract class AbstractServlet extends AbstractLocalSyncWorker {
* Override this method to implementing business logic.
*
* @param parameter {@link Map}, get the request parameter by key.
* @param response {@link JsonObject}, set the response data as json object.
* @param response {@link JsonElement}, set the response data as json object.
* @throws ArgumentsParseException if the key could not contains in parameter
* @throws WorkerInvokeException if any error is detected when call(or ask) worker
* @throws WorkerNotFoundException if the worker reference could not found in context.
*/
protected abstract void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException;
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException;
/**
* Set the worker response and the success status into the servlet response object
*
* @param response {@link HttpServletResponse} object that contains the response the servlet reply to the client
* @param resJson {@link JsonObject} object that contains the response from worker
* @param resJson {@link JsonElement} that contains the response from worker
* @throws IOException if any error is detected when the servlet handles the response.
*/
protected void onSuccessResponse(HttpServletResponse response, JsonObject resJson) throws IOException {
resJson.addProperty("isSuccess", true);
protected void onSuccessResponse(HttpServletResponse response, JsonElement resJson) throws IOException {
reply(response, resJson, HttpServletResponse.SC_OK);
}
......@@ -82,14 +87,15 @@ public abstract class AbstractServlet extends AbstractLocalSyncWorker {
* @param response {@link HttpServletResponse} object that contains the response the servlet reply to the client
*/
protected void onErrorResponse(Exception exception, HttpServletResponse response) {
JsonObject resJson = new JsonObject();
resJson.addProperty("isSuccess", false);
resJson.addProperty("reason", exception.getMessage());
response.setHeader("reason", exception.getMessage());
try {
reply(response, resJson, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
reply(response, responseClass().newInstance(), HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
} catch (IOException e) {
logger.error(e.getMessage(), e);
} catch (IllegalAccessException e) {
logger.error(e.getMessage(), e);
} catch (InstantiationException e) {
logger.error(e.getMessage(), e);
}
}
......@@ -97,11 +103,11 @@ public abstract class AbstractServlet extends AbstractLocalSyncWorker {
* Build the response head and body
*
* @param response {@link HttpServletResponse} object that contains the response the servlet reply to the client
* @param resJson {@link JsonObject} object that contains the response from worker
* @param resJson {@link JsonElement} that contains the response from worker
* @param status http status code
* @throws IOException if an input or output error is detected when the servlet handles the response
*/
private void reply(HttpServletResponse response, JsonObject resJson, int status) throws IOException {
private void reply(HttpServletResponse response, JsonElement resJson, int status) throws IOException {
response.setContentType("text/json");
response.setCharacterEncoding("utf-8");
response.setStatus(status);
......
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.BufferedReader;
import java.io.IOException;
......@@ -58,13 +59,13 @@ public abstract class AbstractStreamPost extends AbstractServlet {
* Override the default implementation, forbidden to call this method.
*
* @param parameter {@link Map}, get the request parameter by key.
* @param response {@link JsonObject}, set the response data as json object.
* @param response {@link JsonElement}, set the response data as json object.
* @throws ArgumentsParseException if the key could not contains in parameter
* @throws WorkerInvokeException if any error is detected when call(or ask) worker
* @throws WorkerNotFoundException if the worker reference could not found in context.
*/
@Override final protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
throw new WorkerInvokeException("Use the other method with buffer reader parameter");
}
......
package org.skywalking.apm.collector.worker.noderef;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.Arrays;
import java.util.Map;
......@@ -30,13 +31,17 @@ public class NodeRefResSumGetGroupWithTimeSlice extends AbstractGet {
super(role, clusterContext, selfContext);
}
@Override protected Class<? extends JsonElement> responseClass() {
return JsonObject.class;
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeRefResSumGroupWithTimeSlice.WorkerRole.INSTANCE).create(this);
}
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
if (!parameter.containsKey("startTime") || !parameter.containsKey("endTime") || !parameter.containsKey("timeSliceType")) {
throw new ArgumentsParseException("the request parameter must contains startTime,endTime,timeSliceType");
}
......
package org.skywalking.apm.collector.worker.segment;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.Arrays;
import java.util.Map;
......@@ -30,13 +31,17 @@ public class SegmentTopGet extends AbstractGet {
super(role, clusterContext, selfContext);
}
@Override protected Class<? extends JsonElement> responseClass() {
return JsonObject.class;
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(SegmentTopSearch.WorkerRole.INSTANCE).create(this);
}
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
if (!parameter.containsKey("startTime") || !parameter.containsKey("endTime") || !parameter.containsKey("from") || !parameter.containsKey("limit")) {
throw new ArgumentsParseException("the request parameter must contains startTime, endTime, from, limit");
}
......
package org.skywalking.apm.collector.worker.span;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.Arrays;
import java.util.Map;
......@@ -30,13 +31,17 @@ public class SpanGetWithId extends AbstractGet {
super(role, clusterContext, selfContext);
}
@Override protected Class<? extends JsonElement> responseClass() {
return JsonObject.class;
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(SpanSearchWithId.WorkerRole.INSTANCE).create(this);
}
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
if (!parameter.containsKey("segId") || !parameter.containsKey("spanId")) {
throw new ArgumentsParseException("the request parameter must contains segId, spanId");
}
......
package org.skywalking.apm.collector.worker.tracedag;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.Arrays;
import java.util.Map;
......@@ -34,6 +35,10 @@ public class TraceDagGetWithTimeSlice extends AbstractGet {
super(role, clusterContext, selfContext);
}
@Override protected Class<? extends JsonElement> responseClass() {
return JsonObject.class;
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeCompLoad.WorkerRole.INSTANCE).create(this);
......@@ -43,7 +48,7 @@ public class TraceDagGetWithTimeSlice extends AbstractGet {
}
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
if (!parameter.containsKey("startTime") || !parameter.containsKey("endTime") || !parameter.containsKey("timeSliceType")) {
throw new ArgumentsParseException("the request parameter must contains startTime,endTime,timeSliceType");
}
......@@ -87,7 +92,7 @@ public class TraceDagGetWithTimeSlice extends AbstractGet {
JsonObject result = getBuilder().build(compResponse.get(Const.RESULT).getAsJsonArray(), nodeMappingResponse.get(Const.RESULT).getAsJsonArray(),
nodeRefResponse.get(Const.RESULT).getAsJsonArray(), resSumResponse.get(Const.RESULT).getAsJsonArray());
response.add(Const.RESULT, result);
((JsonObject)response).add(Const.RESULT, result);
}
private JsonObject getNewResponse() {
......
org.skywalking.apm.collector.worker.grpcserver.GRPCAddressRegister$Factory
org.skywalking.apm.collector.worker.globaltrace.persistence.GlobalTraceAgg$Factory
org.skywalking.apm.collector.worker.noderef.persistence.NodeRefDayAgg$Factory
......
......@@ -2,4 +2,6 @@ org.skywalking.apm.collector.worker.noderef.NodeRefResSumGetGroupWithTimeSlice$F
org.skywalking.apm.collector.worker.segment.SegmentTopGet$Factory
org.skywalking.apm.collector.worker.globaltrace.GlobalTraceGetWithGlobalId$Factory
org.skywalking.apm.collector.worker.span.SpanGetWithId$Factory
org.skywalking.apm.collector.worker.tracedag.TraceDagGetWithTimeSlice$Factory
\ No newline at end of file
org.skywalking.apm.collector.worker.tracedag.TraceDagGetWithTimeSlice$Factory
org.skywalking.apm.collector.worker.grpcserver.GRPCAddressGet$Factory
\ No newline at end of file
......@@ -6,7 +6,7 @@ cluster.current.port = 11800
# In this version, all members have same roles, and everyone of them is listening the status of others.
#The routers do not send message to nodes, which is unreachable, caused by network trouble, jvm crash or any other reasons.
cluster.current.roles=WorkersListener
cluster.current.roles=WorkersListener,RPCAddressListener
#Initial contact points of the cluster, e.g. seed_nodes = 127.0.0.1:11800, 127.0.0.1:11801.
#The nodes to join automatically at startup.
......@@ -40,6 +40,7 @@ http.port=12800
http.contextPath=/
# GRPC services
grpc.hostname=127.0.0.1
grpc.port=22800
# Cache size of analysis worker. The value determines whether sending to next worker and clear, or not.
......
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.Map;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
......@@ -19,13 +20,17 @@ public class TestAbstractGet extends AbstractGet {
super(role, clusterContext, selfContext);
}
@Override protected Class<? extends JsonElement> responseClass() {
return JsonObject.class;
}
@Override
public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
}
......
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.Map;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
......@@ -19,13 +20,17 @@ public class TestAbstractPost extends AbstractPost {
super(role, clusterContext, selfContext);
}
@Override protected Class<? extends JsonElement> responseClass() {
return JsonObject.class;
}
@Override
public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonElement response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
}
......
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.BufferedReader;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
......@@ -19,6 +20,10 @@ public class TestAbstractStreamPost extends AbstractStreamPost {
super(role, clusterContext, selfContext);
}
@Override protected Class<? extends JsonElement> responseClass() {
return JsonObject.class;
}
@Override
public void preStart() throws ProviderNotFoundException {
super.preStart();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册