提交 f187931e 编写于 作者: P pengys5

zk add node listener

上级 ce6d4c8d
...@@ -15,7 +15,11 @@ public class AgentJVMGRPCDataListener extends ClusterDataListener { ...@@ -15,7 +15,11 @@ public class AgentJVMGRPCDataListener extends ClusterDataListener {
return PATH; return PATH;
} }
@Override public void addressChangedNotify() { @Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify() {
} }
} }
...@@ -15,6 +15,11 @@ public class AgentRegisterGRPCDataListener extends ClusterDataListener { ...@@ -15,6 +15,11 @@ public class AgentRegisterGRPCDataListener extends ClusterDataListener {
return PATH; return PATH;
} }
@Override public void addressChangedNotify() { @Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify() {
} }
} }
...@@ -15,6 +15,11 @@ public class AgentRegisterJettyDataListener extends ClusterDataListener { ...@@ -15,6 +15,11 @@ public class AgentRegisterJettyDataListener extends ClusterDataListener {
return PATH; return PATH;
} }
@Override public void addressChangedNotify() { @Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify() {
} }
} }
...@@ -13,7 +13,11 @@ public class AgentServerJettyDataListener extends ClusterDataListener { ...@@ -13,7 +13,11 @@ public class AgentServerJettyDataListener extends ClusterDataListener {
return ClusterModuleDefine.BASE_CATALOG + "." + AgentServerModuleGroupDefine.GROUP_NAME + "." + AgentServerJettyModuleDefine.MODULE_NAME; return ClusterModuleDefine.BASE_CATALOG + "." + AgentServerModuleGroupDefine.GROUP_NAME + "." + AgentServerJettyModuleDefine.MODULE_NAME;
} }
@Override public void addressChangedNotify() { @Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify() {
} }
} }
...@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentserver.jetty.handler; ...@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentserver.jetty.handler;
import com.google.gson.JsonArray; import com.google.gson.JsonArray;
import com.google.gson.JsonElement; import com.google.gson.JsonElement;
import java.util.List; import java.util.Set;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agentstream.grpc.AgentStreamGRPCDataListener; import org.skywalking.apm.collector.agentstream.grpc.AgentStreamGRPCDataListener;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine; import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
...@@ -23,7 +23,7 @@ public class AgentStreamGRPCServerHandler extends JettyHandler { ...@@ -23,7 +23,7 @@ public class AgentStreamGRPCServerHandler extends JettyHandler {
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException { @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader(); ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader();
List<String> servers = reader.read(AgentStreamGRPCDataListener.PATH); Set<String> servers = reader.read(AgentStreamGRPCDataListener.PATH);
JsonArray serverArray = new JsonArray(); JsonArray serverArray = new JsonArray();
servers.forEach(server -> serverArray.add(server)); servers.forEach(server -> serverArray.add(server));
return serverArray; return serverArray;
......
...@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentserver.jetty.handler; ...@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentserver.jetty.handler;
import com.google.gson.JsonArray; import com.google.gson.JsonArray;
import com.google.gson.JsonElement; import com.google.gson.JsonElement;
import java.util.List; import java.util.Set;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agentstream.jetty.AgentStreamJettyDataListener; import org.skywalking.apm.collector.agentstream.jetty.AgentStreamJettyDataListener;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine; import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
...@@ -23,7 +23,7 @@ public class AgentStreamJettyServerHandler extends JettyHandler { ...@@ -23,7 +23,7 @@ public class AgentStreamJettyServerHandler extends JettyHandler {
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException { @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader(); ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader();
List<String> servers = reader.read(AgentStreamJettyDataListener.PATH); Set<String> servers = reader.read(AgentStreamJettyDataListener.PATH);
JsonArray serverArray = new JsonArray(); JsonArray serverArray = new JsonArray();
servers.forEach(server -> { servers.forEach(server -> {
serverArray.add(server); serverArray.add(server);
......
...@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentserver.jetty.handler; ...@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentserver.jetty.handler;
import com.google.gson.JsonArray; import com.google.gson.JsonArray;
import com.google.gson.JsonElement; import com.google.gson.JsonElement;
import java.util.List; import java.util.Set;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine; import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext; import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
...@@ -23,7 +23,7 @@ public class UIJettyServerHandler extends JettyHandler { ...@@ -23,7 +23,7 @@ public class UIJettyServerHandler extends JettyHandler {
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException { @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader(); ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader();
List<String> servers = reader.read(UIJettyDataListener.PATH); Set<String> servers = reader.read(UIJettyDataListener.PATH);
JsonArray serverArray = new JsonArray(); JsonArray serverArray = new JsonArray();
servers.forEach(server -> { servers.forEach(server -> {
serverArray.add(server); serverArray.add(server);
......
...@@ -15,6 +15,11 @@ public class AgentStreamGRPCDataListener extends ClusterDataListener { ...@@ -15,6 +15,11 @@ public class AgentStreamGRPCDataListener extends ClusterDataListener {
return PATH; return PATH;
} }
@Override public void addressChangedNotify() { @Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify() {
} }
} }
...@@ -15,6 +15,11 @@ public class AgentStreamJettyDataListener extends ClusterDataListener { ...@@ -15,6 +15,11 @@ public class AgentStreamJettyDataListener extends ClusterDataListener {
return PATH; return PATH;
} }
@Override public void addressChangedNotify() { @Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify() {
} }
} }
...@@ -2,34 +2,42 @@ cluster: ...@@ -2,34 +2,42 @@ cluster:
zookeeper: zookeeper:
hostPort: localhost:2181 hostPort: localhost:2181
sessionTimeout: 100000 sessionTimeout: 100000
#agent_server: agent_server:
# jetty: jetty:
# host: localhost host: localhost
# port: 10800 port: 10800
# context_path: / context_path: /
#agent_register: agent_register:
# grpc: grpc:
# host: localhost host: localhost
# port: 11800 port: 11800
# jetty: jetty:
# host: localhost host: localhost
# port: 12800 port: 12800
# context_path: / context_path: /
#agent_stream: agent_stream:
# grpc: grpc:
# host: localhost host: localhost
# port: 11800 port: 11800
# jetty: jetty:
# host: localhost host: localhost
# port: 12800 port: 12800
# context_path: / context_path: /
#ui: agent_jvm:
# jetty: grpc:
# host: localhost host: localhost
# port: 12800 port: 11800
# context_path: / ui:
jetty:
host: localhost
port: 12800
context_path: /
collector_inside:
grpc:
host: localhost
port: 11800
storage: storage:
elasticsearch: elasticsearch:
cluster_name: CollectorDBCluster cluster_name: CollectorDBCluster
cluster_transport_sniffer: true cluster_transport_sniffer: true
cluster_nodes: localhost:9300 cluster_nodes: localhost:9300
\ No newline at end of file
...@@ -4,16 +4,12 @@ import io.grpc.ManagedChannel; ...@@ -4,16 +4,12 @@ import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.client.ClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* @author pengys5 * @author pengys5
*/ */
public class GRPCClient implements Client { public class GRPCClient implements Client {
private final Logger logger = LoggerFactory.getLogger(GRPCClient.class);
private final String host; private final String host;
private final int port; private final int port;
...@@ -32,4 +28,8 @@ public class GRPCClient implements Client { ...@@ -32,4 +28,8 @@ public class GRPCClient implements Client {
public ManagedChannel getChannel() { public ManagedChannel getChannel() {
return channel; return channel;
} }
@Override public String toString() {
return host + ":" + port;
}
} }
...@@ -7,6 +7,7 @@ import org.apache.zookeeper.CreateMode; ...@@ -7,6 +7,7 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient; import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClientException; import org.skywalking.apm.collector.client.zookeeper.ZookeeperClientException;
import org.skywalking.apm.collector.client.zookeeper.util.PathUtils; import org.skywalking.apm.collector.client.zookeeper.util.PathUtils;
...@@ -36,19 +37,22 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher { ...@@ -36,19 +37,22 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher {
} }
@Override public void process(WatchedEvent event) { @Override public void process(WatchedEvent event) {
logger.debug("changed path {}", event.getPath()); logger.info("changed path {}, event type: {}", event.getPath(), event.getType().name());
if (listeners.containsKey(event.getPath())) { if (listeners.containsKey(event.getPath())) {
List<String> paths = null; List<String> paths;
try { try {
paths = client.getChildren(event.getPath(), true); paths = client.getChildren(event.getPath(), true);
listeners.get(event.getPath()).clearData();
if (CollectionUtils.isNotEmpty(paths)) { if (CollectionUtils.isNotEmpty(paths)) {
for (String serverPath : paths) { for (String serverPath : paths) {
byte[] data = client.getData(event.getPath() + "/" + serverPath, false, null); Stat stat = new Stat();
byte[] data = client.getData(event.getPath() + "/" + serverPath, true, stat);
String dataStr = new String(data); String dataStr = new String(data);
logger.debug("path children has been changed, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr); if (stat.getCzxid() == stat.getMzxid()) {
listeners.get(event.getPath()).addAddress(serverPath + dataStr); logger.info("path children has been created, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr);
listeners.get(event.getPath()).addressChangedNotify(); listeners.get(event.getPath()).serverJoinNotify(serverPath + dataStr);
} else {
logger.info("path children has been changed, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr);
}
} }
} }
} catch (ZookeeperClientException e) { } catch (ZookeeperClientException e) {
...@@ -73,7 +77,6 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher { ...@@ -73,7 +77,6 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher {
client.getChildren(path, true); client.getChildren(path, true);
String serverPath = path + "/" + value.getHostPort(); String serverPath = path + "/" + value.getHostPort();
listener.addAddress(value.getHostPort() + contextPath);
if (client.exists(serverPath, false) == null) { if (client.exists(serverPath, false) == null) {
setData(serverPath, contextPath); setData(serverPath, contextPath);
......
package org.skywalking.apm.collector.core.cluster; package org.skywalking.apm.collector.core.cluster;
import java.util.LinkedList; import java.util.HashSet;
import java.util.List; import java.util.Set;
import org.skywalking.apm.collector.core.framework.Listener; import org.skywalking.apm.collector.core.framework.Listener;
/** /**
...@@ -9,10 +9,10 @@ import org.skywalking.apm.collector.core.framework.Listener; ...@@ -9,10 +9,10 @@ import org.skywalking.apm.collector.core.framework.Listener;
*/ */
public abstract class ClusterDataListener implements Listener { public abstract class ClusterDataListener implements Listener {
private List<String> addresses; private Set<String> addresses;
public ClusterDataListener() { public ClusterDataListener() {
addresses = new LinkedList<>(); addresses = new HashSet<>();
} }
public abstract String path(); public abstract String path();
...@@ -21,13 +21,15 @@ public abstract class ClusterDataListener implements Listener { ...@@ -21,13 +21,15 @@ public abstract class ClusterDataListener implements Listener {
addresses.add(address); addresses.add(address);
} }
public final List<String> getAddresses() { public final void removeAddress(String address) {
return addresses; addresses.remove(address);
} }
public final void clearData() { public final Set<String> getAddresses() {
addresses.clear(); return addresses;
} }
public abstract void addressChangedNotify(); public abstract void serverJoinNotify(String serverAddress);
public abstract void serverQuitNotify();
} }
package org.skywalking.apm.collector.core.cluster; package org.skywalking.apm.collector.core.cluster;
import java.util.List; import java.util.Set;
import org.skywalking.apm.collector.core.client.DataMonitor; import org.skywalking.apm.collector.core.client.DataMonitor;
/** /**
...@@ -14,7 +14,7 @@ public abstract class ClusterModuleRegistrationReader { ...@@ -14,7 +14,7 @@ public abstract class ClusterModuleRegistrationReader {
this.dataMonitor = dataMonitor; this.dataMonitor = dataMonitor;
} }
public final List<String> read(String path) { public final Set<String> read(String path) {
return dataMonitor.getListener(path).getAddresses(); return dataMonitor.getListener(path).getAddresses();
} }
} }
...@@ -34,7 +34,7 @@ public class NodeReferenceDataDefine extends DataDefine { ...@@ -34,7 +34,7 @@ public class NodeReferenceDataDefine extends DataDefine {
@Override public Object deserialize(RemoteData remoteData) { @Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0); String id = remoteData.getDataStrings(0);
int applicationId = remoteData.getDataIntegers(0); int frontApplicationId = remoteData.getDataIntegers(0);
int behindApplicationId = remoteData.getDataIntegers(1); int behindApplicationId = remoteData.getDataIntegers(1);
String behindPeer = remoteData.getDataStrings(1); String behindPeer = remoteData.getDataStrings(1);
int s1LTE = remoteData.getDataIntegers(2); int s1LTE = remoteData.getDataIntegers(2);
...@@ -44,23 +44,23 @@ public class NodeReferenceDataDefine extends DataDefine { ...@@ -44,23 +44,23 @@ public class NodeReferenceDataDefine extends DataDefine {
int summary = remoteData.getDataIntegers(6); int summary = remoteData.getDataIntegers(6);
int error = remoteData.getDataIntegers(7); int error = remoteData.getDataIntegers(7);
long timeBucket = remoteData.getDataLongs(0); long timeBucket = remoteData.getDataLongs(0);
return new NodeReference(id, applicationId, behindApplicationId, behindPeer, s1LTE, s3LTE, s5LTE, s5GT, summary, error, timeBucket); return new NodeReference(id, frontApplicationId, behindApplicationId, behindPeer, s1LTE, s3LTE, s5LTE, s5GT, summary, error, timeBucket);
} }
@Override public RemoteData serialize(Object object) { @Override public RemoteData serialize(Object object) {
NodeReference nodeReference = (NodeReference)object; Data data = (Data)object;
RemoteData.Builder builder = RemoteData.newBuilder(); RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(nodeReference.getId()); builder.addDataStrings(data.getDataString(0));
builder.addDataIntegers(nodeReference.getFrontApplicationId()); builder.addDataIntegers(data.getDataInteger(0));
builder.addDataIntegers(nodeReference.getBehindApplicationId()); builder.addDataIntegers(data.getDataInteger(1));
builder.addDataStrings(nodeReference.getBehindPeer()); builder.addDataStrings(data.getDataString(1));
builder.addDataIntegers(nodeReference.getS1LTE()); builder.addDataIntegers(data.getDataInteger(2));
builder.addDataIntegers(nodeReference.getS3LTE()); builder.addDataIntegers(data.getDataInteger(3));
builder.addDataIntegers(nodeReference.getS5LTE()); builder.addDataIntegers(data.getDataInteger(4));
builder.addDataIntegers(nodeReference.getS5GT()); builder.addDataIntegers(data.getDataInteger(5));
builder.addDataIntegers(nodeReference.getSummary()); builder.addDataIntegers(data.getDataInteger(6));
builder.addDataIntegers(nodeReference.getError()); builder.addDataIntegers(data.getDataInteger(7));
builder.addDataLongs(nodeReference.getTimeBucket()); builder.addDataLongs(data.getDataLong(0));
return builder.build(); return builder.build();
} }
......
...@@ -43,6 +43,7 @@ public class InstanceDataDefine extends DataDefine { ...@@ -43,6 +43,7 @@ public class InstanceDataDefine extends DataDefine {
builder.addDataStrings(instance.getId()); builder.addDataStrings(instance.getId());
builder.addDataIntegers(instance.getApplicationId()); builder.addDataIntegers(instance.getApplicationId());
builder.addDataStrings(instance.getAgentUUID()); builder.addDataStrings(instance.getAgentUUID());
builder.addDataIntegers(instance.getInstanceId());
builder.addDataLongs(instance.getRegisterTime()); builder.addDataLongs(instance.getRegisterTime());
builder.addDataLongs(instance.getHeartBeatTime()); builder.addDataLongs(instance.getHeartBeatTime());
builder.addDataStrings(instance.getOsInfo()); builder.addDataStrings(instance.getOsInfo());
......
...@@ -57,23 +57,23 @@ public class ServiceReferenceDataDefine extends DataDefine { ...@@ -57,23 +57,23 @@ public class ServiceReferenceDataDefine extends DataDefine {
} }
@Override public RemoteData serialize(Object object) { @Override public RemoteData serialize(Object object) {
ServiceReference serviceReference = (ServiceReference)object; Data data = (Data)object;
RemoteData.Builder builder = RemoteData.newBuilder(); RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(serviceReference.getId()); builder.addDataStrings(data.getDataString(0));
builder.addDataIntegers(serviceReference.getEntryServiceId()); builder.addDataIntegers(data.getDataInteger(0));
builder.addDataStrings(serviceReference.getEntryServiceName()); builder.addDataStrings(data.getDataString(1));
builder.addDataIntegers(serviceReference.getFrontServiceId()); builder.addDataIntegers(data.getDataInteger(1));
builder.addDataStrings(serviceReference.getFrontServiceName()); builder.addDataStrings(data.getDataString(2));
builder.addDataIntegers(serviceReference.getBehindServiceId()); builder.addDataIntegers(data.getDataInteger(2));
builder.addDataStrings(serviceReference.getBehindServiceName()); builder.addDataStrings(data.getDataString(3));
builder.addDataLongs(serviceReference.getS1Lte()); builder.addDataLongs(data.getDataLong(0));
builder.addDataLongs(serviceReference.getS3Lte()); builder.addDataLongs(data.getDataLong(1));
builder.addDataLongs(serviceReference.getS5Lte()); builder.addDataLongs(data.getDataLong(2));
builder.addDataLongs(serviceReference.getS5Gt()); builder.addDataLongs(data.getDataLong(3));
builder.addDataLongs(serviceReference.getSummary()); builder.addDataLongs(data.getDataLong(4));
builder.addDataLongs(serviceReference.getError()); builder.addDataLongs(data.getDataLong(5));
builder.addDataLongs(serviceReference.getCostSummary()); builder.addDataLongs(data.getDataLong(6));
builder.addDataLongs(serviceReference.getTimeBucket()); builder.addDataLongs(data.getDataLong(7));
return builder.build(); return builder.build();
} }
......
...@@ -9,7 +9,7 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller; ...@@ -9,7 +9,7 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
*/ */
public class StreamModuleGroupDefine implements ModuleGroupDefine { public class StreamModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "stream"; public static final String GROUP_NAME = "collector_inside";
@Override public String name() { @Override public String name() {
return GROUP_NAME; return GROUP_NAME;
......
package org.skywalking.apm.collector.stream.grpc; package org.skywalking.apm.collector.stream.grpc;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.skywalking.apm.collector.client.grpc.GRPCClient; import org.skywalking.apm.collector.client.grpc.GRPCClient;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine; import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
...@@ -10,7 +9,6 @@ import org.skywalking.apm.collector.core.cluster.ClusterDataListener; ...@@ -10,7 +9,6 @@ import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext; import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.RemoteWorkerRef;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -28,47 +26,39 @@ public class StreamGRPCDataListener extends ClusterDataListener { ...@@ -28,47 +26,39 @@ public class StreamGRPCDataListener extends ClusterDataListener {
} }
private Map<String, GRPCClient> clients = new HashMap<>(); private Map<String, GRPCClient> clients = new HashMap<>();
private Map<String, RemoteWorkerRef> workerRefs = new HashMap<>();
@Override public void addressChangedNotify() { @Override public void serverJoinNotify(String serverAddress) {
String selfAddress = StreamGRPCConfig.HOST + ":" + StreamGRPCConfig.PORT; String selfAddress = StreamGRPCConfig.HOST + ":" + StreamGRPCConfig.PORT;
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME); StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
List<String> addresses = getAddresses(); if (!clients.containsKey(serverAddress)) {
clients.keySet().forEach(address -> { logger.info("new address: {}, create this address remote worker reference", serverAddress);
if (!addresses.contains(address)) { String[] hostPort = serverAddress.split(":");
context.getClusterWorkerContext().remove(workerRefs.get(address)); GRPCClient client = new GRPCClient(hostPort[0], Integer.valueOf(hostPort[1]));
workerRefs.remove(address); try {
client.initialize();
} catch (ClientException e) {
e.printStackTrace();
} }
}); clients.put(serverAddress, client);
for (String address : addresses) {
if (!clients.containsKey(address)) {
logger.debug("new address: {}, create this address remote worker reference", address);
String[] hostPort = address.split(":");
GRPCClient client = new GRPCClient(hostPort[0], Integer.valueOf(hostPort[1]));
try {
client.initialize();
} catch (ClientException e) {
e.printStackTrace();
}
clients.put(address, client);
if (selfAddress.equals(address)) { if (selfAddress.equals(serverAddress)) {
context.getClusterWorkerContext().getProviders().forEach(provider -> { context.getClusterWorkerContext().getProviders().forEach(provider -> {
logger.debug("create remote worker self reference, role: {}", provider.role().roleName()); logger.info("create remote worker self reference, role: {}", provider.role().roleName());
provider.create(); provider.create();
}); });
} else {
context.getClusterWorkerContext().getProviders().forEach(provider -> {
logger.debug("create remote worker reference, role: {}", provider.role().roleName());
RemoteWorkerRef workerRef = provider.create(client);
});
}
} else { } else {
logger.debug("address: {} had remote worker reference, ignore", address); context.getClusterWorkerContext().getProviders().forEach(provider -> {
logger.info("create remote worker reference, role: {}", provider.role().roleName());
provider.create(client);
});
} }
} else {
logger.info("address: {} had remote worker reference, ignore", serverAddress);
} }
} }
@Override public void serverQuitNotify() {
}
} }
...@@ -20,7 +20,7 @@ import org.skywalking.apm.collector.stream.grpc.handler.RemoteCommonServiceHandl ...@@ -20,7 +20,7 @@ import org.skywalking.apm.collector.stream.grpc.handler.RemoteCommonServiceHandl
*/ */
public class StreamGRPCModuleDefine extends StreamModuleDefine { public class StreamGRPCModuleDefine extends StreamModuleDefine {
public static final String MODULE_NAME = "stream"; public static final String MODULE_NAME = "grpc";
@Override public String name() { @Override public String name() {
return MODULE_NAME; return MODULE_NAME;
......
...@@ -10,8 +10,6 @@ import org.skywalking.apm.collector.server.grpc.GRPCHandler; ...@@ -10,8 +10,6 @@ import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.collector.stream.StreamModuleContext; import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.Role; import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -30,10 +28,10 @@ public class RemoteCommonServiceHandler extends RemoteCommonServiceGrpc.RemoteCo ...@@ -30,10 +28,10 @@ public class RemoteCommonServiceHandler extends RemoteCommonServiceGrpc.RemoteCo
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME); StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
Role role = context.getClusterWorkerContext().getRole(roleName); Role role = context.getClusterWorkerContext().getRole(roleName);
Object object = role.dataDefine().deserialize(remoteData);
try { try {
Object object = role.dataDefine().deserialize(remoteData);
context.getClusterWorkerContext().lookupInSide(roleName).tell(object); context.getClusterWorkerContext().lookupInSide(roleName).tell(object);
} catch (WorkerNotFoundException | WorkerInvokeException e) { } catch (Throwable e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} }
} }
......
...@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.stream.worker; ...@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.stream.worker;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.client.grpc.GRPCClient; import org.skywalking.apm.collector.client.grpc.GRPCClient;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.remote.grpc.proto.Empty; import org.skywalking.apm.collector.remote.grpc.proto.Empty;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc; import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
...@@ -20,12 +21,14 @@ public class RemoteWorkerRef extends WorkerRef { ...@@ -20,12 +21,14 @@ public class RemoteWorkerRef extends WorkerRef {
private final RemoteCommonServiceGrpc.RemoteCommonServiceStub stub; private final RemoteCommonServiceGrpc.RemoteCommonServiceStub stub;
private StreamObserver<RemoteMessage> streamObserver; private StreamObserver<RemoteMessage> streamObserver;
private final AbstractRemoteWorker remoteWorker; private final AbstractRemoteWorker remoteWorker;
private final String address;
public RemoteWorkerRef(Role role, AbstractRemoteWorker remoteWorker) { public RemoteWorkerRef(Role role, AbstractRemoteWorker remoteWorker) {
super(role); super(role);
this.remoteWorker = remoteWorker; this.remoteWorker = remoteWorker;
this.acrossJVM = false; this.acrossJVM = false;
this.stub = null; this.stub = null;
this.address = Const.EMPTY_STRING;
} }
public RemoteWorkerRef(Role role, GRPCClient client) { public RemoteWorkerRef(Role role, GRPCClient client) {
...@@ -33,19 +36,23 @@ public class RemoteWorkerRef extends WorkerRef { ...@@ -33,19 +36,23 @@ public class RemoteWorkerRef extends WorkerRef {
this.remoteWorker = null; this.remoteWorker = null;
this.acrossJVM = true; this.acrossJVM = true;
this.stub = RemoteCommonServiceGrpc.newStub(client.getChannel()); this.stub = RemoteCommonServiceGrpc.newStub(client.getChannel());
this.address = client.toString();
createStreamObserver(); createStreamObserver();
} }
@Override @Override
public void tell(Object message) throws WorkerInvokeException { public void tell(Object message) throws WorkerInvokeException {
if (acrossJVM) { if (acrossJVM) {
RemoteData remoteData = getRole().dataDefine().serialize(message); try {
RemoteData remoteData = getRole().dataDefine().serialize(message);
RemoteMessage.Builder builder = RemoteMessage.newBuilder(); RemoteMessage.Builder builder = RemoteMessage.newBuilder();
builder.setWorkerRole(getRole().roleName()); builder.setWorkerRole(getRole().roleName());
builder.setRemoteData(remoteData); builder.setRemoteData(remoteData);
streamObserver.onNext(builder.build()); streamObserver.onNext(builder.build());
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
} else { } else {
remoteWorker.allocateJob(message); remoteWorker.allocateJob(message);
} }
...@@ -113,4 +120,11 @@ public class RemoteWorkerRef extends WorkerRef { ...@@ -113,4 +120,11 @@ public class RemoteWorkerRef extends WorkerRef {
} }
} }
} }
@Override public String toString() {
StringBuilder toString = new StringBuilder();
toString.append("acrossJVM: ").append(acrossJVM);
toString.append(", address: ").append(address);
return toString.toString();
}
} }
...@@ -29,6 +29,11 @@ public class WorkerRefs<T extends WorkerRef> { ...@@ -29,6 +29,11 @@ public class WorkerRefs<T extends WorkerRef> {
public void tell(Object message) throws WorkerInvokeException { public void tell(Object message) throws WorkerInvokeException {
logger.debug("WorkerSelector instance of {}", workerSelector.getClass()); logger.debug("WorkerSelector instance of {}", workerSelector.getClass());
workerRefs.forEach(workerRef -> {
if (workerRef instanceof RemoteWorkerRef) {
logger.info("message hashcode: {}, select workers: {}", message.hashCode(), workerRef.toString());
}
});
workerSelector.select(workerRefs, message).tell(message); workerSelector.select(workerRefs, message).tell(message);
} }
} }
...@@ -15,6 +15,11 @@ public class UIJettyDataListener extends ClusterDataListener { ...@@ -15,6 +15,11 @@ public class UIJettyDataListener extends ClusterDataListener {
return PATH; return PATH;
} }
@Override public void addressChangedNotify() { @Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify() {
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册