From f187931e8df54cfb717b8f7edb1e086b0d30d377 Mon Sep 17 00:00:00 2001 From: pengys5 <8082209@qq.com> Date: Sun, 10 Sep 2017 01:25:34 +0800 Subject: [PATCH] zk add node listener --- .../grpc/AgentJVMGRPCDataListener.java | 6 +- .../grpc/AgentRegisterGRPCDataListener.java | 7 ++- .../jetty/AgentRegisterJettyDataListener.java | 7 ++- .../jetty/AgentServerJettyDataListener.java | 6 +- .../handler/AgentStreamGRPCServerHandler.java | 4 +- .../AgentStreamJettyServerHandler.java | 4 +- .../jetty/handler/UIJettyServerHandler.java | 4 +- .../grpc/AgentStreamGRPCDataListener.java | 7 ++- .../jetty/AgentStreamJettyDataListener.java | 7 ++- .../src/main/resources/application.yml | 62 +++++++++++-------- .../apm/collector/client/grpc/GRPCClient.java | 8 +-- .../zookeeper/ClusterZKDataMonitor.java | 19 +++--- .../core/cluster/ClusterDataListener.java | 20 +++--- .../ClusterModuleRegistrationReader.java | 4 +- .../noderef/NodeReferenceDataDefine.java | 28 ++++----- .../define/register/InstanceDataDefine.java | 1 + .../ServiceReferenceDataDefine.java | 32 +++++----- .../stream/StreamModuleGroupDefine.java | 2 +- .../stream/grpc/StreamGRPCDataListener.java | 60 ++++++++---------- .../stream/grpc/StreamGRPCModuleDefine.java | 2 +- .../handler/RemoteCommonServiceHandler.java | 6 +- .../stream/worker/RemoteWorkerRef.java | 28 ++++++--- .../collector/stream/worker/WorkerRefs.java | 5 ++ .../ui/jetty/UIJettyDataListener.java | 7 ++- 24 files changed, 195 insertions(+), 141 deletions(-) diff --git a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/grpc/AgentJVMGRPCDataListener.java b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/grpc/AgentJVMGRPCDataListener.java index b25fcdb8dd..b60afc4fd7 100644 --- a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/grpc/AgentJVMGRPCDataListener.java +++ b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/grpc/AgentJVMGRPCDataListener.java @@ -15,7 +15,11 @@ public class AgentJVMGRPCDataListener extends ClusterDataListener { return PATH; } - @Override public void addressChangedNotify() { + @Override public void serverJoinNotify(String serverAddress) { + + } + + @Override public void serverQuitNotify() { } } diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCDataListener.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCDataListener.java index 6ff6004d98..a8ad38c14e 100644 --- a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCDataListener.java +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/grpc/AgentRegisterGRPCDataListener.java @@ -15,6 +15,11 @@ public class AgentRegisterGRPCDataListener extends ClusterDataListener { return PATH; } - @Override public void addressChangedNotify() { + @Override public void serverJoinNotify(String serverAddress) { + + } + + @Override public void serverQuitNotify() { + } } diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/jetty/AgentRegisterJettyDataListener.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/jetty/AgentRegisterJettyDataListener.java index a15ff860e6..d56a04930e 100644 --- a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/jetty/AgentRegisterJettyDataListener.java +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/jetty/AgentRegisterJettyDataListener.java @@ -15,6 +15,11 @@ public class AgentRegisterJettyDataListener extends ClusterDataListener { return PATH; } - @Override public void addressChangedNotify() { + @Override public void serverJoinNotify(String serverAddress) { + + } + + @Override public void serverQuitNotify() { + } } diff --git a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyDataListener.java b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyDataListener.java index 1ce44e9d83..9c4f254680 100644 --- a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyDataListener.java +++ b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/AgentServerJettyDataListener.java @@ -13,7 +13,11 @@ public class AgentServerJettyDataListener extends ClusterDataListener { return ClusterModuleDefine.BASE_CATALOG + "." + AgentServerModuleGroupDefine.GROUP_NAME + "." + AgentServerJettyModuleDefine.MODULE_NAME; } - @Override public void addressChangedNotify() { + @Override public void serverJoinNotify(String serverAddress) { + + } + + @Override public void serverQuitNotify() { } } diff --git a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/AgentStreamGRPCServerHandler.java b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/AgentStreamGRPCServerHandler.java index f1cd494617..652af12c38 100644 --- a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/AgentStreamGRPCServerHandler.java +++ b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/AgentStreamGRPCServerHandler.java @@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentserver.jetty.handler; import com.google.gson.JsonArray; import com.google.gson.JsonElement; -import java.util.List; +import java.util.Set; import javax.servlet.http.HttpServletRequest; import org.skywalking.apm.collector.agentstream.grpc.AgentStreamGRPCDataListener; import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine; @@ -23,7 +23,7 @@ public class AgentStreamGRPCServerHandler extends JettyHandler { @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException { ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader(); - List servers = reader.read(AgentStreamGRPCDataListener.PATH); + Set servers = reader.read(AgentStreamGRPCDataListener.PATH); JsonArray serverArray = new JsonArray(); servers.forEach(server -> serverArray.add(server)); return serverArray; diff --git a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/AgentStreamJettyServerHandler.java b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/AgentStreamJettyServerHandler.java index bfcecdbf71..40a1e339a2 100644 --- a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/AgentStreamJettyServerHandler.java +++ b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/AgentStreamJettyServerHandler.java @@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentserver.jetty.handler; import com.google.gson.JsonArray; import com.google.gson.JsonElement; -import java.util.List; +import java.util.Set; import javax.servlet.http.HttpServletRequest; import org.skywalking.apm.collector.agentstream.jetty.AgentStreamJettyDataListener; import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine; @@ -23,7 +23,7 @@ public class AgentStreamJettyServerHandler extends JettyHandler { @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException { ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader(); - List servers = reader.read(AgentStreamJettyDataListener.PATH); + Set servers = reader.read(AgentStreamJettyDataListener.PATH); JsonArray serverArray = new JsonArray(); servers.forEach(server -> { serverArray.add(server); diff --git a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/UIJettyServerHandler.java b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/UIJettyServerHandler.java index d38868f38f..9c81388bcc 100644 --- a/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/UIJettyServerHandler.java +++ b/apm-collector/apm-collector-agentserver/src/main/java/org/skywalking/apm/collector/agentserver/jetty/handler/UIJettyServerHandler.java @@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentserver.jetty.handler; import com.google.gson.JsonArray; import com.google.gson.JsonElement; -import java.util.List; +import java.util.Set; import javax.servlet.http.HttpServletRequest; import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine; import org.skywalking.apm.collector.core.cluster.ClusterModuleContext; @@ -23,7 +23,7 @@ public class UIJettyServerHandler extends JettyHandler { @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException { ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader(); - List servers = reader.read(UIJettyDataListener.PATH); + Set servers = reader.read(UIJettyDataListener.PATH); JsonArray serverArray = new JsonArray(); servers.forEach(server -> { serverArray.add(server); diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCDataListener.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCDataListener.java index dd3428ec3b..12b28c8a00 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCDataListener.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCDataListener.java @@ -15,6 +15,11 @@ public class AgentStreamGRPCDataListener extends ClusterDataListener { return PATH; } - @Override public void addressChangedNotify() { + @Override public void serverJoinNotify(String serverAddress) { + + } + + @Override public void serverQuitNotify() { + } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyDataListener.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyDataListener.java index 375c84754a..4b2774dd45 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyDataListener.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/jetty/AgentStreamJettyDataListener.java @@ -15,6 +15,11 @@ public class AgentStreamJettyDataListener extends ClusterDataListener { return PATH; } - @Override public void addressChangedNotify() { + @Override public void serverJoinNotify(String serverAddress) { + + } + + @Override public void serverQuitNotify() { + } } diff --git a/apm-collector/apm-collector-boot/src/main/resources/application.yml b/apm-collector/apm-collector-boot/src/main/resources/application.yml index 2477d79ece..0ff32ca040 100644 --- a/apm-collector/apm-collector-boot/src/main/resources/application.yml +++ b/apm-collector/apm-collector-boot/src/main/resources/application.yml @@ -2,34 +2,42 @@ cluster: zookeeper: hostPort: localhost:2181 sessionTimeout: 100000 -#agent_server: -# jetty: -# host: localhost -# port: 10800 -# context_path: / -#agent_register: -# grpc: -# host: localhost -# port: 11800 -# jetty: -# host: localhost -# port: 12800 -# context_path: / -#agent_stream: -# grpc: -# host: localhost -# port: 11800 -# jetty: -# host: localhost -# port: 12800 -# context_path: / -#ui: -# jetty: -# host: localhost -# port: 12800 -# context_path: / +agent_server: + jetty: + host: localhost + port: 10800 + context_path: / +agent_register: + grpc: + host: localhost + port: 11800 + jetty: + host: localhost + port: 12800 + context_path: / +agent_stream: + grpc: + host: localhost + port: 11800 + jetty: + host: localhost + port: 12800 + context_path: / +agent_jvm: + grpc: + host: localhost + port: 11800 +ui: + jetty: + host: localhost + port: 12800 + context_path: / +collector_inside: + grpc: + host: localhost + port: 11800 storage: elasticsearch: cluster_name: CollectorDBCluster cluster_transport_sniffer: true - cluster_nodes: localhost:9300 \ No newline at end of file + cluster_nodes: localhost:9300 diff --git a/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/grpc/GRPCClient.java b/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/grpc/GRPCClient.java index 8788a54720..19a077514f 100644 --- a/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/grpc/GRPCClient.java +++ b/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/grpc/GRPCClient.java @@ -4,16 +4,12 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.client.ClientException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @author pengys5 */ public class GRPCClient implements Client { - private final Logger logger = LoggerFactory.getLogger(GRPCClient.class); - private final String host; private final int port; @@ -32,4 +28,8 @@ public class GRPCClient implements Client { public ManagedChannel getChannel() { return channel; } + + @Override public String toString() { + return host + ":" + port; + } } diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java index a2e6c862fd..da6d99e11c 100644 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java @@ -7,6 +7,7 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.Stat; import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient; import org.skywalking.apm.collector.client.zookeeper.ZookeeperClientException; import org.skywalking.apm.collector.client.zookeeper.util.PathUtils; @@ -36,19 +37,22 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher { } @Override public void process(WatchedEvent event) { - logger.debug("changed path {}", event.getPath()); + logger.info("changed path {}, event type: {}", event.getPath(), event.getType().name()); if (listeners.containsKey(event.getPath())) { - List paths = null; + List paths; try { paths = client.getChildren(event.getPath(), true); - listeners.get(event.getPath()).clearData(); if (CollectionUtils.isNotEmpty(paths)) { for (String serverPath : paths) { - byte[] data = client.getData(event.getPath() + "/" + serverPath, false, null); + Stat stat = new Stat(); + byte[] data = client.getData(event.getPath() + "/" + serverPath, true, stat); String dataStr = new String(data); - logger.debug("path children has been changed, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr); - listeners.get(event.getPath()).addAddress(serverPath + dataStr); - listeners.get(event.getPath()).addressChangedNotify(); + if (stat.getCzxid() == stat.getMzxid()) { + logger.info("path children has been created, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr); + listeners.get(event.getPath()).serverJoinNotify(serverPath + dataStr); + } else { + logger.info("path children has been changed, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr); + } } } } catch (ZookeeperClientException e) { @@ -73,7 +77,6 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher { client.getChildren(path, true); String serverPath = path + "/" + value.getHostPort(); - listener.addAddress(value.getHostPort() + contextPath); if (client.exists(serverPath, false) == null) { setData(serverPath, contextPath); diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataListener.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataListener.java index 463a33adfa..6613678c56 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataListener.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterDataListener.java @@ -1,7 +1,7 @@ package org.skywalking.apm.collector.core.cluster; -import java.util.LinkedList; -import java.util.List; +import java.util.HashSet; +import java.util.Set; import org.skywalking.apm.collector.core.framework.Listener; /** @@ -9,10 +9,10 @@ import org.skywalking.apm.collector.core.framework.Listener; */ public abstract class ClusterDataListener implements Listener { - private List addresses; + private Set addresses; public ClusterDataListener() { - addresses = new LinkedList<>(); + addresses = new HashSet<>(); } public abstract String path(); @@ -21,13 +21,15 @@ public abstract class ClusterDataListener implements Listener { addresses.add(address); } - public final List getAddresses() { - return addresses; + public final void removeAddress(String address) { + addresses.remove(address); } - public final void clearData() { - addresses.clear(); + public final Set getAddresses() { + return addresses; } - public abstract void addressChangedNotify(); + public abstract void serverJoinNotify(String serverAddress); + + public abstract void serverQuitNotify(); } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationReader.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationReader.java index ddfcdd56ad..67e1b82a73 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationReader.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/cluster/ClusterModuleRegistrationReader.java @@ -1,6 +1,6 @@ package org.skywalking.apm.collector.core.cluster; -import java.util.List; +import java.util.Set; import org.skywalking.apm.collector.core.client.DataMonitor; /** @@ -14,7 +14,7 @@ public abstract class ClusterModuleRegistrationReader { this.dataMonitor = dataMonitor; } - public final List read(String path) { + public final Set read(String path) { return dataMonitor.getListener(path).getAddresses(); } } diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/define/noderef/NodeReferenceDataDefine.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/define/noderef/NodeReferenceDataDefine.java index a1d514e3e5..66c61a0d96 100644 --- a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/define/noderef/NodeReferenceDataDefine.java +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/define/noderef/NodeReferenceDataDefine.java @@ -34,7 +34,7 @@ public class NodeReferenceDataDefine extends DataDefine { @Override public Object deserialize(RemoteData remoteData) { String id = remoteData.getDataStrings(0); - int applicationId = remoteData.getDataIntegers(0); + int frontApplicationId = remoteData.getDataIntegers(0); int behindApplicationId = remoteData.getDataIntegers(1); String behindPeer = remoteData.getDataStrings(1); int s1LTE = remoteData.getDataIntegers(2); @@ -44,23 +44,23 @@ public class NodeReferenceDataDefine extends DataDefine { int summary = remoteData.getDataIntegers(6); int error = remoteData.getDataIntegers(7); long timeBucket = remoteData.getDataLongs(0); - return new NodeReference(id, applicationId, behindApplicationId, behindPeer, s1LTE, s3LTE, s5LTE, s5GT, summary, error, timeBucket); + return new NodeReference(id, frontApplicationId, behindApplicationId, behindPeer, s1LTE, s3LTE, s5LTE, s5GT, summary, error, timeBucket); } @Override public RemoteData serialize(Object object) { - NodeReference nodeReference = (NodeReference)object; + Data data = (Data)object; RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(nodeReference.getId()); - builder.addDataIntegers(nodeReference.getFrontApplicationId()); - builder.addDataIntegers(nodeReference.getBehindApplicationId()); - builder.addDataStrings(nodeReference.getBehindPeer()); - builder.addDataIntegers(nodeReference.getS1LTE()); - builder.addDataIntegers(nodeReference.getS3LTE()); - builder.addDataIntegers(nodeReference.getS5LTE()); - builder.addDataIntegers(nodeReference.getS5GT()); - builder.addDataIntegers(nodeReference.getSummary()); - builder.addDataIntegers(nodeReference.getError()); - builder.addDataLongs(nodeReference.getTimeBucket()); + builder.addDataStrings(data.getDataString(0)); + builder.addDataIntegers(data.getDataInteger(0)); + builder.addDataIntegers(data.getDataInteger(1)); + builder.addDataStrings(data.getDataString(1)); + builder.addDataIntegers(data.getDataInteger(2)); + builder.addDataIntegers(data.getDataInteger(3)); + builder.addDataIntegers(data.getDataInteger(4)); + builder.addDataIntegers(data.getDataInteger(5)); + builder.addDataIntegers(data.getDataInteger(6)); + builder.addDataIntegers(data.getDataInteger(7)); + builder.addDataLongs(data.getDataLong(0)); return builder.build(); } diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/define/register/InstanceDataDefine.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/define/register/InstanceDataDefine.java index 20c6043343..b079aec931 100644 --- a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/define/register/InstanceDataDefine.java +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/define/register/InstanceDataDefine.java @@ -43,6 +43,7 @@ public class InstanceDataDefine extends DataDefine { builder.addDataStrings(instance.getId()); builder.addDataIntegers(instance.getApplicationId()); builder.addDataStrings(instance.getAgentUUID()); + builder.addDataIntegers(instance.getInstanceId()); builder.addDataLongs(instance.getRegisterTime()); builder.addDataLongs(instance.getHeartBeatTime()); builder.addDataStrings(instance.getOsInfo()); diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/define/serviceref/ServiceReferenceDataDefine.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/define/serviceref/ServiceReferenceDataDefine.java index e0ad40e9f7..2e1358d184 100644 --- a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/define/serviceref/ServiceReferenceDataDefine.java +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/define/serviceref/ServiceReferenceDataDefine.java @@ -57,23 +57,23 @@ public class ServiceReferenceDataDefine extends DataDefine { } @Override public RemoteData serialize(Object object) { - ServiceReference serviceReference = (ServiceReference)object; + Data data = (Data)object; RemoteData.Builder builder = RemoteData.newBuilder(); - builder.addDataStrings(serviceReference.getId()); - builder.addDataIntegers(serviceReference.getEntryServiceId()); - builder.addDataStrings(serviceReference.getEntryServiceName()); - builder.addDataIntegers(serviceReference.getFrontServiceId()); - builder.addDataStrings(serviceReference.getFrontServiceName()); - builder.addDataIntegers(serviceReference.getBehindServiceId()); - builder.addDataStrings(serviceReference.getBehindServiceName()); - builder.addDataLongs(serviceReference.getS1Lte()); - builder.addDataLongs(serviceReference.getS3Lte()); - builder.addDataLongs(serviceReference.getS5Lte()); - builder.addDataLongs(serviceReference.getS5Gt()); - builder.addDataLongs(serviceReference.getSummary()); - builder.addDataLongs(serviceReference.getError()); - builder.addDataLongs(serviceReference.getCostSummary()); - builder.addDataLongs(serviceReference.getTimeBucket()); + builder.addDataStrings(data.getDataString(0)); + builder.addDataIntegers(data.getDataInteger(0)); + builder.addDataStrings(data.getDataString(1)); + builder.addDataIntegers(data.getDataInteger(1)); + builder.addDataStrings(data.getDataString(2)); + builder.addDataIntegers(data.getDataInteger(2)); + builder.addDataStrings(data.getDataString(3)); + builder.addDataLongs(data.getDataLong(0)); + builder.addDataLongs(data.getDataLong(1)); + builder.addDataLongs(data.getDataLong(2)); + builder.addDataLongs(data.getDataLong(3)); + builder.addDataLongs(data.getDataLong(4)); + builder.addDataLongs(data.getDataLong(5)); + builder.addDataLongs(data.getDataLong(6)); + builder.addDataLongs(data.getDataLong(7)); return builder.build(); } diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleGroupDefine.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleGroupDefine.java index e320a817da..99b11bbbb7 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleGroupDefine.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/StreamModuleGroupDefine.java @@ -9,7 +9,7 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller; */ public class StreamModuleGroupDefine implements ModuleGroupDefine { - public static final String GROUP_NAME = "stream"; + public static final String GROUP_NAME = "collector_inside"; @Override public String name() { return GROUP_NAME; diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCDataListener.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCDataListener.java index 89f7e64a34..781e65d55b 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCDataListener.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCDataListener.java @@ -1,7 +1,6 @@ package org.skywalking.apm.collector.stream.grpc; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.skywalking.apm.collector.client.grpc.GRPCClient; import org.skywalking.apm.collector.cluster.ClusterModuleDefine; @@ -10,7 +9,6 @@ import org.skywalking.apm.collector.core.cluster.ClusterDataListener; import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.stream.StreamModuleContext; import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; -import org.skywalking.apm.collector.stream.worker.RemoteWorkerRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,47 +26,39 @@ public class StreamGRPCDataListener extends ClusterDataListener { } private Map clients = new HashMap<>(); - private Map workerRefs = new HashMap<>(); - @Override public void addressChangedNotify() { + @Override public void serverJoinNotify(String serverAddress) { String selfAddress = StreamGRPCConfig.HOST + ":" + StreamGRPCConfig.PORT; - StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME); - List addresses = getAddresses(); - clients.keySet().forEach(address -> { - if (!addresses.contains(address)) { - context.getClusterWorkerContext().remove(workerRefs.get(address)); - workerRefs.remove(address); + if (!clients.containsKey(serverAddress)) { + logger.info("new address: {}, create this address remote worker reference", serverAddress); + String[] hostPort = serverAddress.split(":"); + GRPCClient client = new GRPCClient(hostPort[0], Integer.valueOf(hostPort[1])); + try { + client.initialize(); + } catch (ClientException e) { + e.printStackTrace(); } - }); - - for (String address : addresses) { - if (!clients.containsKey(address)) { - logger.debug("new address: {}, create this address remote worker reference", address); - String[] hostPort = address.split(":"); - GRPCClient client = new GRPCClient(hostPort[0], Integer.valueOf(hostPort[1])); - try { - client.initialize(); - } catch (ClientException e) { - e.printStackTrace(); - } - clients.put(address, client); + clients.put(serverAddress, client); - if (selfAddress.equals(address)) { - context.getClusterWorkerContext().getProviders().forEach(provider -> { - logger.debug("create remote worker self reference, role: {}", provider.role().roleName()); - provider.create(); - }); - } else { - context.getClusterWorkerContext().getProviders().forEach(provider -> { - logger.debug("create remote worker reference, role: {}", provider.role().roleName()); - RemoteWorkerRef workerRef = provider.create(client); - }); - } + if (selfAddress.equals(serverAddress)) { + context.getClusterWorkerContext().getProviders().forEach(provider -> { + logger.info("create remote worker self reference, role: {}", provider.role().roleName()); + provider.create(); + }); } else { - logger.debug("address: {} had remote worker reference, ignore", address); + context.getClusterWorkerContext().getProviders().forEach(provider -> { + logger.info("create remote worker reference, role: {}", provider.role().roleName()); + provider.create(client); + }); } + } else { + logger.info("address: {} had remote worker reference, ignore", serverAddress); } } + + @Override public void serverQuitNotify() { + + } } diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCModuleDefine.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCModuleDefine.java index 3a81664765..1cfa930920 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCModuleDefine.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/StreamGRPCModuleDefine.java @@ -20,7 +20,7 @@ import org.skywalking.apm.collector.stream.grpc.handler.RemoteCommonServiceHandl */ public class StreamGRPCModuleDefine extends StreamModuleDefine { - public static final String MODULE_NAME = "stream"; + public static final String MODULE_NAME = "grpc"; @Override public String name() { return MODULE_NAME; diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/handler/RemoteCommonServiceHandler.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/handler/RemoteCommonServiceHandler.java index d1d92ebd39..4494116a06 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/handler/RemoteCommonServiceHandler.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/grpc/handler/RemoteCommonServiceHandler.java @@ -10,8 +10,6 @@ import org.skywalking.apm.collector.server.grpc.GRPCHandler; import org.skywalking.apm.collector.stream.StreamModuleContext; import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; import org.skywalking.apm.collector.stream.worker.Role; -import org.skywalking.apm.collector.stream.worker.WorkerInvokeException; -import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,10 +28,10 @@ public class RemoteCommonServiceHandler extends RemoteCommonServiceGrpc.RemoteCo StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME); Role role = context.getClusterWorkerContext().getRole(roleName); - Object object = role.dataDefine().deserialize(remoteData); try { + Object object = role.dataDefine().deserialize(remoteData); context.getClusterWorkerContext().lookupInSide(roleName).tell(object); - } catch (WorkerNotFoundException | WorkerInvokeException e) { + } catch (Throwable e) { logger.error(e.getMessage(), e); } } diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerRef.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerRef.java index 707230f6ca..86d4ec85c2 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerRef.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/RemoteWorkerRef.java @@ -2,6 +2,7 @@ package org.skywalking.apm.collector.stream.worker; import io.grpc.stub.StreamObserver; import org.skywalking.apm.collector.client.grpc.GRPCClient; +import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.remote.grpc.proto.Empty; import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc; import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; @@ -20,12 +21,14 @@ public class RemoteWorkerRef extends WorkerRef { private final RemoteCommonServiceGrpc.RemoteCommonServiceStub stub; private StreamObserver streamObserver; private final AbstractRemoteWorker remoteWorker; + private final String address; public RemoteWorkerRef(Role role, AbstractRemoteWorker remoteWorker) { super(role); this.remoteWorker = remoteWorker; this.acrossJVM = false; this.stub = null; + this.address = Const.EMPTY_STRING; } public RemoteWorkerRef(Role role, GRPCClient client) { @@ -33,19 +36,23 @@ public class RemoteWorkerRef extends WorkerRef { this.remoteWorker = null; this.acrossJVM = true; this.stub = RemoteCommonServiceGrpc.newStub(client.getChannel()); + this.address = client.toString(); createStreamObserver(); } @Override public void tell(Object message) throws WorkerInvokeException { if (acrossJVM) { - RemoteData remoteData = getRole().dataDefine().serialize(message); - - RemoteMessage.Builder builder = RemoteMessage.newBuilder(); - builder.setWorkerRole(getRole().roleName()); - builder.setRemoteData(remoteData); - - streamObserver.onNext(builder.build()); + try { + RemoteData remoteData = getRole().dataDefine().serialize(message); + RemoteMessage.Builder builder = RemoteMessage.newBuilder(); + builder.setWorkerRole(getRole().roleName()); + builder.setRemoteData(remoteData); + + streamObserver.onNext(builder.build()); + } catch (Throwable e) { + logger.error(e.getMessage(), e); + } } else { remoteWorker.allocateJob(message); } @@ -113,4 +120,11 @@ public class RemoteWorkerRef extends WorkerRef { } } } + + @Override public String toString() { + StringBuilder toString = new StringBuilder(); + toString.append("acrossJVM: ").append(acrossJVM); + toString.append(", address: ").append(address); + return toString.toString(); + } } diff --git a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerRefs.java b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerRefs.java index 1fb3dfeda1..9f2f66f65c 100644 --- a/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerRefs.java +++ b/apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/WorkerRefs.java @@ -29,6 +29,11 @@ public class WorkerRefs { public void tell(Object message) throws WorkerInvokeException { logger.debug("WorkerSelector instance of {}", workerSelector.getClass()); + workerRefs.forEach(workerRef -> { + if (workerRef instanceof RemoteWorkerRef) { + logger.info("message hashcode: {}, select workers: {}", message.hashCode(), workerRef.toString()); + } + }); workerSelector.select(workerRefs, message).tell(message); } } diff --git a/apm-collector/apm-collector-ui/src/main/java/org/skywalking/apm/collector/ui/jetty/UIJettyDataListener.java b/apm-collector/apm-collector-ui/src/main/java/org/skywalking/apm/collector/ui/jetty/UIJettyDataListener.java index 00a1104dd1..e3d4bffb2e 100644 --- a/apm-collector/apm-collector-ui/src/main/java/org/skywalking/apm/collector/ui/jetty/UIJettyDataListener.java +++ b/apm-collector/apm-collector-ui/src/main/java/org/skywalking/apm/collector/ui/jetty/UIJettyDataListener.java @@ -15,6 +15,11 @@ public class UIJettyDataListener extends ClusterDataListener { return PATH; } - @Override public void addressChangedNotify() { + @Override public void serverJoinNotify(String serverAddress) { + + } + + @Override public void serverQuitNotify() { + } } -- GitLab