提交 a1cc0e1f 编写于 作者: P peng-yongsheng

Refactor remote module.

上级 2ede0e8a
...@@ -41,51 +41,75 @@ public abstract class Data extends AbstractHashMessage { ...@@ -41,51 +41,75 @@ public abstract class Data extends AbstractHashMessage {
this.dataBytes = new byte[byteColumns.length][]; this.dataBytes = new byte[byteColumns.length][];
} }
protected void setDataString(int position, String value) { public int getDataStringsCount() {
return dataStrings.length;
}
public int getDataLongsCount() {
return dataLongs.length;
}
public int getDataDoublesCount() {
return dataDoubles.length;
}
public int getDataIntegersCount() {
return dataIntegers.length;
}
public int getDataBooleansCount() {
return dataBooleans.length;
}
public int getDataBytesCount() {
return dataBytes.length;
}
public void setDataString(int position, String value) {
dataStrings[position] = value; dataStrings[position] = value;
} }
protected void setDataLong(int position, Long value) { public void setDataLong(int position, Long value) {
dataLongs[position] = value; dataLongs[position] = value;
} }
protected void setDataDouble(int position, Double value) { public void setDataDouble(int position, Double value) {
dataDoubles[position] = value; dataDoubles[position] = value;
} }
protected void setDataInteger(int position, Integer value) { public void setDataInteger(int position, Integer value) {
dataIntegers[position] = value; dataIntegers[position] = value;
} }
protected void setDataBoolean(int position, Boolean value) { public void setDataBoolean(int position, Boolean value) {
dataBooleans[position] = value; dataBooleans[position] = value;
} }
protected void setDataBytes(int position, byte[] dataBytes) { public void setDataBytes(int position, byte[] dataBytes) {
this.dataBytes[position] = dataBytes; this.dataBytes[position] = dataBytes;
} }
protected String getDataString(int position) { public String getDataString(int position) {
return dataStrings[position]; return dataStrings[position];
} }
protected Long getDataLong(int position) { public Long getDataLong(int position) {
return dataLongs[position]; return dataLongs[position];
} }
protected Double getDataDouble(int position) { public Double getDataDouble(int position) {
return dataDoubles[position]; return dataDoubles[position];
} }
protected Integer getDataInteger(int position) { public Integer getDataInteger(int position) {
return dataIntegers[position]; return dataIntegers[position];
} }
protected Boolean getDataBoolean(int position) { public Boolean getDataBoolean(int position) {
return dataBooleans[position]; return dataBooleans[position];
} }
protected byte[] getDataBytes(int position) { public byte[] getDataBytes(int position) {
return dataBytes[position]; return dataBytes[position];
} }
......
...@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.remote; ...@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.remote;
import org.skywalking.apm.collector.core.module.Module; import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.remote.service.RemoteClientService; import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.remote.service.RemoteServerService; import org.skywalking.apm.collector.remote.service.RemoteServerService;
import org.skywalking.apm.collector.remote.service.RemoteSerializeService;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
...@@ -34,6 +35,6 @@ public class RemoteModule extends Module { ...@@ -34,6 +35,6 @@ public class RemoteModule extends Module {
} }
@Override public Class[] services() { @Override public Class[] services() {
return new Class[] {RemoteServerService.class, RemoteClientService.class}; return new Class[] {RemoteServerService.class, RemoteClientService.class, RemoteSerializeService.class};
} }
} }
...@@ -23,6 +23,8 @@ import org.skywalking.apm.collector.core.data.Data; ...@@ -23,6 +23,8 @@ import org.skywalking.apm.collector.core.data.Data;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public interface DataReceiver { public interface DataReceiver<Output extends Data> {
void receive(String roleName, Data data); Output output(int graphId, int nodeId);
void receive(Output data);
} }
...@@ -24,5 +24,5 @@ import org.skywalking.apm.collector.core.data.Data; ...@@ -24,5 +24,5 @@ import org.skywalking.apm.collector.core.data.Data;
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public interface RemoteClient { public interface RemoteClient {
void send(String roleName, Data data, int remoteDataMappingId); void send(int graphId, int nodeId, Data data);
} }
...@@ -23,9 +23,6 @@ import org.skywalking.apm.collector.core.data.Data; ...@@ -23,9 +23,6 @@ import org.skywalking.apm.collector.core.data.Data;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public interface SerializableAndDeserialize<RemoteData, Builder> { public interface RemoteDeserializeService<RemoteData> {
void deserialize(RemoteData remoteData, Data data); void deserialize(RemoteData remoteData, Data data);
Builder serialize(Data data);
} }
...@@ -18,8 +18,12 @@ ...@@ -18,8 +18,12 @@
package org.skywalking.apm.collector.remote.service; package org.skywalking.apm.collector.remote.service;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.module.Service;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public interface InstPerformanceRemoteService<RemoteData, Builder> extends SerializableAndDeserialize<RemoteData, Builder> { public interface RemoteSerializeService<Builder> extends Service {
Builder serialize(Data data);
} }
...@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.remote.grpc; ...@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.remote.grpc;
import java.util.Properties; import java.util.Properties;
import org.skywalking.apm.collector.cluster.ClusterModule; import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService; import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.skywalking.apm.collector.core.module.Module; import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException; import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
...@@ -9,22 +10,15 @@ import org.skywalking.apm.collector.core.module.ModuleProvider; ...@@ -9,22 +10,15 @@ import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.grpc.manager.GRPCManagerModule; import org.skywalking.apm.collector.grpc.manager.GRPCManagerModule;
import org.skywalking.apm.collector.grpc.manager.service.GRPCManagerService; import org.skywalking.apm.collector.grpc.manager.service.GRPCManagerService;
import org.skywalking.apm.collector.remote.RemoteDataMappingContainer;
import org.skywalking.apm.collector.remote.RemoteModule; import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.remote.grpc.data.instance.InstPerformanceRemoteData;
import org.skywalking.apm.collector.remote.grpc.data.node.NodeComponentRemoteData;
import org.skywalking.apm.collector.remote.grpc.data.node.NodeMappingRemoteData;
import org.skywalking.apm.collector.remote.grpc.data.noderef.NodeReferenceRemoteData;
import org.skywalking.apm.collector.remote.grpc.data.register.ApplicationRemoteData;
import org.skywalking.apm.collector.remote.grpc.data.register.InstanceRemoteData;
import org.skywalking.apm.collector.remote.grpc.data.register.ServiceNameRemoteData;
import org.skywalking.apm.collector.remote.grpc.data.service.ServiceEntryRemoteData;
import org.skywalking.apm.collector.remote.grpc.data.serviceref.ServiceReferenceRemoteData;
import org.skywalking.apm.collector.remote.grpc.handler.RemoteCommonServiceHandler; import org.skywalking.apm.collector.remote.grpc.handler.RemoteCommonServiceHandler;
import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClientService; import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClientService;
import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteListener;
import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSerializeService;
import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteServerService; import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteServerService;
import org.skywalking.apm.collector.remote.service.DataReceiverRegisterListener; import org.skywalking.apm.collector.remote.service.DataReceiverRegisterListener;
import org.skywalking.apm.collector.remote.service.RemoteClientService; import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.remote.service.RemoteSerializeService;
import org.skywalking.apm.collector.remote.service.RemoteServerService; import org.skywalking.apm.collector.remote.service.RemoteServerService;
import org.skywalking.apm.collector.server.Server; import org.skywalking.apm.collector.server.Server;
...@@ -33,13 +27,13 @@ import org.skywalking.apm.collector.server.Server; ...@@ -33,13 +27,13 @@ import org.skywalking.apm.collector.server.Server;
*/ */
public class RemoteModuleGRPCProvider extends ModuleProvider { public class RemoteModuleGRPCProvider extends ModuleProvider {
public static final String NAME = "gRPC";
private static final String HOST = "host"; private static final String HOST = "host";
private static final String PORT = "port"; private static final String PORT = "port";
private RemoteDataMappingContainer container;
private final DataReceiverRegisterListener listener = new DataReceiverRegisterListener(); private final DataReceiverRegisterListener listener = new DataReceiverRegisterListener();
@Override public String name() { @Override public String name() {
return "gRPC"; return NAME;
} }
@Override public Class<? extends Module> module() { @Override public Class<? extends Module> module() {
...@@ -47,9 +41,9 @@ public class RemoteModuleGRPCProvider extends ModuleProvider { ...@@ -47,9 +41,9 @@ public class RemoteModuleGRPCProvider extends ModuleProvider {
} }
@Override public void prepare(Properties config) throws ServiceNotProvidedException { @Override public void prepare(Properties config) throws ServiceNotProvidedException {
container = loadRemoteData();
this.registerServiceImplementation(RemoteServerService.class, new GRPCRemoteServerService(listener)); this.registerServiceImplementation(RemoteServerService.class, new GRPCRemoteServerService(listener));
this.registerServiceImplementation(RemoteClientService.class, new GRPCRemoteClientService(container)); this.registerServiceImplementation(RemoteClientService.class, new GRPCRemoteClientService());
this.registerServiceImplementation(RemoteSerializeService.class, new GRPCRemoteSerializeService());
} }
@Override public void start(Properties config) throws ServiceNotProvidedException { @Override public void start(Properties config) throws ServiceNotProvidedException {
...@@ -59,10 +53,13 @@ public class RemoteModuleGRPCProvider extends ModuleProvider { ...@@ -59,10 +53,13 @@ public class RemoteModuleGRPCProvider extends ModuleProvider {
try { try {
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class); GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
Server gRPCServer = managerService.getOrCreateIfAbsent(host, port); Server gRPCServer = managerService.getOrCreateIfAbsent(host, port);
gRPCServer.addHandler(new RemoteCommonServiceHandler(container, listener)); gRPCServer.addHandler(new RemoteCommonServiceHandler(listener));
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class); ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
moduleRegisterService.register(RemoteModule.NAME, this.name(), new RemoteModuleGRPCRegistration(host, port)); moduleRegisterService.register(RemoteModule.NAME, this.name(), new RemoteModuleGRPCRegistration(host, port));
ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
moduleListenerService.addListener(new GRPCRemoteListener());
} catch (ModuleNotFoundException e) { } catch (ModuleNotFoundException e) {
throw new ServiceNotProvidedException(e.getMessage()); throw new ServiceNotProvidedException(e.getMessage());
} }
...@@ -75,18 +72,4 @@ public class RemoteModuleGRPCProvider extends ModuleProvider { ...@@ -75,18 +72,4 @@ public class RemoteModuleGRPCProvider extends ModuleProvider {
@Override public String[] requiredModules() { @Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, GRPCManagerModule.NAME}; return new String[] {ClusterModule.NAME, GRPCManagerModule.NAME};
} }
private RemoteDataMappingContainer loadRemoteData() {
RemoteDataMappingContainer container = new RemoteDataMappingContainer();
container.addMapping(new InstPerformanceRemoteData());
container.addMapping(new NodeComponentRemoteData());
container.addMapping(new NodeMappingRemoteData());
container.addMapping(new NodeReferenceRemoteData());
container.addMapping(new ApplicationRemoteData());
container.addMapping(new InstanceRemoteData());
container.addMapping(new ServiceNameRemoteData());
container.addMapping(new ServiceEntryRemoteData());
container.addMapping(new ServiceReferenceRemoteData());
return container;
}
} }
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.remote.grpc.data;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.remote.service.SerializableAndDeserialize;
/**
* @author peng-yongsheng
*/
public abstract class GRPCRemoteData implements SerializableAndDeserialize<RemoteData, RemoteData.Builder> {
protected final Data build(RemoteData remoteData) {
return new Data(remoteData.getDataStrings(0), remoteData.getStringCapacity(), remoteData.getLongCapacity(), remoteData.getDoubleCapacity(), remoteData.getIntegerCapacity(), remoteData.getBooleanCapacity(), remoteData.getByteCapacity());
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.remote.grpc.data.instance;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.core.data.Data;
/**
* @author peng-yongsheng
*/
public class InstPerformanceRemoteData extends GRPCRemoteData {
@Override public RemoteDataMapping mapping() {
return RemoteDataMapping.InstPerformance;
}
@Override public Data deserialize(RemoteData remoteData) {
Data data = build(remoteData);
data.setDataInteger(0, remoteData.getDataIntegers(0));
data.setDataInteger(1, remoteData.getDataIntegers(1));
data.setDataInteger(2, remoteData.getDataIntegers(2));
data.setDataLong(0, remoteData.getDataLongs(0));
data.setDataLong(1, remoteData.getDataLongs(1));
return data;
}
@Override public RemoteData.Builder serialize(Data data) {
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(data.getDataString(0));
builder.addDataIntegers(data.getDataInteger(0));
builder.addDataIntegers(data.getDataInteger(1));
builder.addDataIntegers(data.getDataInteger(2));
builder.addDataLongs(data.getDataLong(0));
builder.addDataLongs(data.getDataLong(1));
return builder;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.remote.grpc.data.node;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
/**
* @author peng-yongsheng
*/
public class NodeComponentRemoteData extends GRPCRemoteData {
@Override public RemoteDataMapping mapping() {
return RemoteDataMapping.NodeComponent;
}
@Override public RemoteData.Builder serialize(Data data) {
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(data.getDataString(0));
builder.addDataIntegers(data.getDataInteger(0));
builder.addDataStrings(data.getDataString(1));
builder.addDataIntegers(data.getDataInteger(1));
builder.addDataStrings(data.getDataString(2));
builder.addDataLongs(data.getDataLong(0));
return builder;
}
@Override public Data deserialize(RemoteData remoteData) {
Data data = build(remoteData);
data.setDataInteger(0, remoteData.getDataIntegers(0));
data.setDataString(1, remoteData.getDataStrings(1));
data.setDataInteger(1, remoteData.getDataIntegers(1));
data.setDataString(2, remoteData.getDataStrings(2));
data.setDataLong(0, remoteData.getDataLongs(0));
return data;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.remote.grpc.data.node;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
/**
* @author peng-yongsheng
*/
public class NodeMappingRemoteData extends GRPCRemoteData {
@Override public RemoteDataMapping mapping() {
return RemoteDataMapping.NodeMapping;
}
@Override public RemoteData.Builder serialize(Data data) {
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(data.getDataString(0));
builder.addDataIntegers(data.getDataInteger(0));
builder.addDataIntegers(data.getDataInteger(1));
builder.addDataStrings(data.getDataString(1));
builder.addDataLongs(data.getDataLong(0));
return builder;
}
@Override public Data deserialize(RemoteData remoteData) {
Data data = build(remoteData);
data.setDataInteger(0, remoteData.getDataIntegers(0));
data.setDataInteger(1, remoteData.getDataIntegers(1));
data.setDataString(1, remoteData.getDataStrings(1));
data.setDataLong(0, remoteData.getDataLongs(0));
return data;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.remote.grpc.data.noderef;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
/**
* @author peng-yongsheng
*/
public class NodeReferenceRemoteData extends GRPCRemoteData {
@Override public RemoteDataMapping mapping() {
return RemoteDataMapping.NodeReference;
}
@Override public RemoteData.Builder serialize(Data data) {
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(data.getDataString(0));
builder.addDataIntegers(data.getDataInteger(0));
builder.addDataIntegers(data.getDataInteger(1));
builder.addDataStrings(data.getDataString(1));
builder.addDataIntegers(data.getDataInteger(2));
builder.addDataIntegers(data.getDataInteger(3));
builder.addDataIntegers(data.getDataInteger(4));
builder.addDataIntegers(data.getDataInteger(5));
builder.addDataIntegers(data.getDataInteger(6));
builder.addDataIntegers(data.getDataInteger(7));
builder.addDataLongs(data.getDataLong(0));
return builder;
}
@Override public Data deserialize(RemoteData remoteData) {
Data data = build(remoteData);
data.setDataInteger(0, remoteData.getDataIntegers(0));
data.setDataInteger(1, remoteData.getDataIntegers(1));
data.setDataString(1, remoteData.getDataStrings(1));
data.setDataInteger(2, remoteData.getDataIntegers(2));
data.setDataInteger(3, remoteData.getDataIntegers(3));
data.setDataInteger(4, remoteData.getDataIntegers(4));
data.setDataInteger(5, remoteData.getDataIntegers(5));
data.setDataInteger(6, remoteData.getDataIntegers(6));
data.setDataInteger(7, remoteData.getDataIntegers(7));
data.setDataLong(0, remoteData.getDataLongs(0));
return data;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.remote.grpc.data.register;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
/**
* @author peng-yongsheng
*/
public class InstanceRemoteData extends GRPCRemoteData {
@Override public RemoteDataMapping mapping() {
return RemoteDataMapping.Instance;
}
@Override public RemoteData.Builder serialize(Data data) {
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(data.getDataString(0));
builder.addDataIntegers(data.getDataInteger(0));
builder.addDataStrings(data.getDataString(1));
builder.addDataIntegers(data.getDataInteger(1));
builder.addDataLongs(data.getDataLong(0));
builder.addDataLongs(data.getDataLong(1));
builder.addDataStrings(data.getDataString(2));
return builder;
}
@Override public Data deserialize(RemoteData remoteData) {
Data data = build(remoteData);
data.setDataString(0, remoteData.getDataStrings(0));
data.setDataInteger(0, remoteData.getDataIntegers(0));
data.setDataString(1, remoteData.getDataStrings(1));
data.setDataInteger(1, remoteData.getDataIntegers(1));
data.setDataLong(0, remoteData.getDataLongs(0));
data.setDataLong(1, remoteData.getDataLongs(1));
data.setDataString(2, remoteData.getDataStrings(2));
return data;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.remote.grpc.data.register;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
/**
* @author peng-yongsheng
*/
public class ServiceNameRemoteData extends GRPCRemoteData {
@Override public RemoteDataMapping mapping() {
return RemoteDataMapping.ServiceName;
}
@Override public RemoteData.Builder serialize(Data data) {
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(data.getDataString(0));
builder.addDataStrings(data.getDataString(1));
builder.addDataIntegers(data.getDataInteger(0));
builder.addDataIntegers(data.getDataInteger(1));
return builder;
}
@Override public Data deserialize(RemoteData remoteData) {
Data data = build(remoteData);
data.setDataString(0, remoteData.getDataStrings(0));
data.setDataString(1, remoteData.getDataStrings(1));
data.setDataInteger(0, remoteData.getDataIntegers(0));
data.setDataInteger(1, remoteData.getDataIntegers(1));
return data;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.remote.grpc.data.service;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
/**
* @author peng-yongsheng
*/
public class ServiceEntryRemoteData extends GRPCRemoteData {
@Override public RemoteDataMapping mapping() {
return RemoteDataMapping.ServiceEntry;
}
@Override public RemoteData.Builder serialize(Data data) {
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(data.getDataString(0));
builder.addDataIntegers(data.getDataInteger(0));
builder.addDataIntegers(data.getDataInteger(1));
builder.addDataStrings(data.getDataString(1));
builder.addDataLongs(data.getDataLong(0));
builder.addDataLongs(data.getDataLong(1));
return builder;
}
@Override public Data deserialize(RemoteData remoteData) {
Data data = build(remoteData);
data.setDataInteger(0, remoteData.getDataIntegers(0));
data.setDataInteger(1, remoteData.getDataIntegers(1));
data.setDataString(1, remoteData.getDataStrings(1));
data.setDataLong(0, remoteData.getDataLongs(0));
data.setDataLong(1, remoteData.getDataLongs(1));
return data;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.remote.grpc.data.serviceref;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceRemoteData extends GRPCRemoteData {
@Override public RemoteDataMapping mapping() {
return RemoteDataMapping.ServiceReference;
}
@Override public RemoteData.Builder serialize(Data data) {
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(data.getDataString(0));
builder.addDataIntegers(data.getDataInteger(0));
builder.addDataStrings(data.getDataString(1));
builder.addDataIntegers(data.getDataInteger(1));
builder.addDataStrings(data.getDataString(2));
builder.addDataIntegers(data.getDataInteger(2));
builder.addDataStrings(data.getDataString(3));
builder.addDataLongs(data.getDataLong(0));
builder.addDataLongs(data.getDataLong(1));
builder.addDataLongs(data.getDataLong(2));
builder.addDataLongs(data.getDataLong(3));
builder.addDataLongs(data.getDataLong(4));
builder.addDataLongs(data.getDataLong(5));
builder.addDataLongs(data.getDataLong(6));
builder.addDataLongs(data.getDataLong(7));
return builder;
}
@Override public Data deserialize(RemoteData remoteData) {
Data data = build(remoteData);
data.setDataInteger(0, remoteData.getDataIntegers(0));
data.setDataString(1, remoteData.getDataStrings(1));
data.setDataInteger(1, remoteData.getDataIntegers(1));
data.setDataString(2, remoteData.getDataStrings(2));
data.setDataInteger(2, remoteData.getDataIntegers(2));
data.setDataString(3, remoteData.getDataStrings(3));
data.setDataLong(0, remoteData.getDataLongs(0));
data.setDataLong(1, remoteData.getDataLongs(1));
data.setDataLong(2, remoteData.getDataLongs(2));
data.setDataLong(3, remoteData.getDataLongs(3));
data.setDataLong(4, remoteData.getDataLongs(4));
data.setDataLong(5, remoteData.getDataLongs(5));
data.setDataLong(6, remoteData.getDataLongs(6));
data.setDataLong(7, remoteData.getDataLongs(7));
return data;
}
}
...@@ -19,11 +19,12 @@ ...@@ -19,11 +19,12 @@
package org.skywalking.apm.collector.remote.grpc.handler; package org.skywalking.apm.collector.remote.grpc.handler;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.remote.RemoteDataMappingContainer; import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.grpc.proto.Empty; import org.skywalking.apm.collector.remote.grpc.proto.Empty;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc; import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage; import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage;
import org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteDeserializeService;
import org.skywalking.apm.collector.remote.service.DataReceiverRegisterListener; import org.skywalking.apm.collector.remote.service.DataReceiverRegisterListener;
import org.skywalking.apm.collector.server.grpc.GRPCHandler; import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -36,20 +37,24 @@ public class RemoteCommonServiceHandler extends RemoteCommonServiceGrpc.RemoteCo ...@@ -36,20 +37,24 @@ public class RemoteCommonServiceHandler extends RemoteCommonServiceGrpc.RemoteCo
private final Logger logger = LoggerFactory.getLogger(RemoteCommonServiceHandler.class); private final Logger logger = LoggerFactory.getLogger(RemoteCommonServiceHandler.class);
private final RemoteDataMappingContainer container;
private final DataReceiverRegisterListener listener; private final DataReceiverRegisterListener listener;
private final GRPCRemoteDeserializeService service;
public RemoteCommonServiceHandler(RemoteDataMappingContainer container, DataReceiverRegisterListener listener) { public RemoteCommonServiceHandler(DataReceiverRegisterListener listener) {
this.container = container;
this.listener = listener; this.listener = listener;
this.service = new GRPCRemoteDeserializeService();
} }
@Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) { @Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) {
return new StreamObserver<RemoteMessage>() { return new StreamObserver<RemoteMessage>() {
@Override public void onNext(RemoteMessage message) { @Override public void onNext(RemoteMessage message) {
String roleName = message.getWorkerRole(); int graphId = message.getGraphId();
int nodeId = message.getNodeId();
RemoteData remoteData = message.getRemoteData(); RemoteData remoteData = message.getRemoteData();
listener.getDataReceiver().receive(roleName, container.get(remoteData.getMappingId()).deserialize(remoteData));
Data output = listener.getDataReceiver().output(graphId, nodeId);
service.deserialize(remoteData, output);
listener.getDataReceiver().receive(output);
} }
@Override public void onError(Throwable throwable) { @Override public void onError(Throwable throwable) {
......
...@@ -20,9 +20,6 @@ package org.skywalking.apm.collector.remote.grpc.service; ...@@ -20,9 +20,6 @@ package org.skywalking.apm.collector.remote.grpc.service;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.core.data.Data; import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
import org.skywalking.apm.collector.remote.RemoteDataMappingContainer;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage; import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage;
import org.skywalking.apm.collector.remote.service.RemoteClient; import org.skywalking.apm.collector.remote.service.RemoteClient;
...@@ -31,19 +28,19 @@ import org.skywalking.apm.collector.remote.service.RemoteClient; ...@@ -31,19 +28,19 @@ import org.skywalking.apm.collector.remote.service.RemoteClient;
*/ */
public class GRPCRemoteClient implements RemoteClient { public class GRPCRemoteClient implements RemoteClient {
private final RemoteDataMappingContainer container; private final GRPCRemoteSerializeService service;
private final StreamObserver<RemoteMessage> streamObserver; private final StreamObserver<RemoteMessage> streamObserver;
public GRPCRemoteClient(RemoteDataMappingContainer container, StreamObserver<RemoteMessage> streamObserver) { public GRPCRemoteClient(StreamObserver<RemoteMessage> streamObserver) {
this.container = container;
this.streamObserver = streamObserver; this.streamObserver = streamObserver;
this.service = new GRPCRemoteSerializeService();
} }
@Override public void send(String roleName, Data data, int remoteDataMappingId) { @Override public void send(int graphId, int nodeId, Data data) {
RemoteData remoteData = (RemoteData)container.get(remoteDataMappingId).serialize(data);
RemoteMessage.Builder builder = RemoteMessage.newBuilder(); RemoteMessage.Builder builder = RemoteMessage.newBuilder();
builder.setWorkerRole(roleName); builder.setGraphId(graphId);
builder.setRemoteData(remoteData); builder.setNodeId(nodeId);
builder.setRemoteData(service.serialize(data));
streamObserver.onNext(builder.build()); streamObserver.onNext(builder.build());
} }
......
...@@ -21,7 +21,6 @@ package org.skywalking.apm.collector.remote.grpc.service; ...@@ -21,7 +21,6 @@ package org.skywalking.apm.collector.remote.grpc.service;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.client.ClientException; import org.skywalking.apm.collector.client.ClientException;
import org.skywalking.apm.collector.client.grpc.GRPCClient; import org.skywalking.apm.collector.client.grpc.GRPCClient;
import org.skywalking.apm.collector.remote.RemoteDataMappingContainer;
import org.skywalking.apm.collector.remote.grpc.proto.Empty; import org.skywalking.apm.collector.remote.grpc.proto.Empty;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc; import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage; import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage;
...@@ -37,12 +36,6 @@ public class GRPCRemoteClientService implements RemoteClientService { ...@@ -37,12 +36,6 @@ public class GRPCRemoteClientService implements RemoteClientService {
private final Logger logger = LoggerFactory.getLogger(GRPCRemoteClientService.class); private final Logger logger = LoggerFactory.getLogger(GRPCRemoteClientService.class);
private final RemoteDataMappingContainer container;
public GRPCRemoteClientService(RemoteDataMappingContainer container) {
this.container = container;
}
@Override public RemoteClient create(String host, int port) { @Override public RemoteClient create(String host, int port) {
GRPCClient client = new GRPCClient(host, port); GRPCClient client = new GRPCClient(host, port);
try { try {
...@@ -52,7 +45,7 @@ public class GRPCRemoteClientService implements RemoteClientService { ...@@ -52,7 +45,7 @@ public class GRPCRemoteClientService implements RemoteClientService {
} }
RemoteCommonServiceGrpc.RemoteCommonServiceStub stub = RemoteCommonServiceGrpc.newStub(client.getChannel()); RemoteCommonServiceGrpc.RemoteCommonServiceStub stub = RemoteCommonServiceGrpc.newStub(client.getChannel());
StreamObserver<RemoteMessage> streamObserver = createStreamObserver(stub); StreamObserver<RemoteMessage> streamObserver = createStreamObserver(stub);
return new GRPCRemoteClient(container, streamObserver); return new GRPCRemoteClient(streamObserver);
} }
private StreamObserver<RemoteMessage> createStreamObserver(RemoteCommonServiceGrpc.RemoteCommonServiceStub stub) { private StreamObserver<RemoteMessage> createStreamObserver(RemoteCommonServiceGrpc.RemoteCommonServiceStub stub) {
......
...@@ -20,18 +20,31 @@ package org.skywalking.apm.collector.remote.grpc.service; ...@@ -20,18 +20,31 @@ package org.skywalking.apm.collector.remote.grpc.service;
import org.skywalking.apm.collector.core.data.Data; import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.remote.service.InstPerformanceRemoteService; import org.skywalking.apm.collector.remote.service.RemoteDeserializeService;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class InstPerformanceGRPCRemoteService implements InstPerformanceRemoteService<RemoteData, RemoteData.Builder> { public class GRPCRemoteDeserializeService implements RemoteDeserializeService<RemoteData> {
@Override public void deserialize(RemoteData remoteData, Data data) { @Override public void deserialize(RemoteData remoteData, Data data) {
for (int i = 0; i < remoteData.getDataStringsCount(); i++) {
} data.setDataString(i, remoteData.getDataStrings(i));
}
@Override public RemoteData.Builder serialize(Data data) { for (int i = 0; i < remoteData.getDataIntegersCount(); i++) {
return null; data.setDataInteger(i, remoteData.getDataIntegers(i));
}
for (int i = 0; i < remoteData.getDataLongsCount(); i++) {
data.setDataLong(i, remoteData.getDataLongs(i));
}
for (int i = 0; i < remoteData.getDataBooleansCount(); i++) {
data.setDataBoolean(i, remoteData.getDataBooleans(i));
}
for (int i = 0; i < remoteData.getDataDoublesCount(); i++) {
data.setDataDouble(i, remoteData.getDataDoubles(i));
}
for (int i = 0; i < remoteData.getDataBytesCount(); i++) {
data.setDataBytes(i, remoteData.getDataBytes(i).toByteArray());
}
} }
} }
...@@ -16,24 +16,45 @@ ...@@ -16,24 +16,45 @@
* Project repository: https://github.com/OpenSkywalking/skywalking * Project repository: https://github.com/OpenSkywalking/skywalking
*/ */
package org.skywalking.apm.collector.remote; package org.skywalking.apm.collector.remote.grpc.service;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.skywalking.apm.collector.remote.service.SerializableAndDeserialize; import java.util.concurrent.ConcurrentHashMap;
import org.skywalking.apm.collector.cluster.ClusterModuleListener;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCProvider;
import org.skywalking.apm.collector.remote.service.RemoteClient;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class RemoteDataMappingContainer { public class GRPCRemoteListener extends ClusterModuleListener {
private Map<Integer, SerializableAndDeserialize> mapping = new HashMap<>(); public static final String PATH = "/" + RemoteModule.NAME + "/" + RemoteModuleGRPCProvider.NAME;
private final GRPCRemoteClientService service;
private final Map<String, RemoteClient> clientMap;
public void addMapping(SerializableAndDeserialize instance) { public GRPCRemoteListener() {
mapping.put(instance.mapping().ordinal(), instance); this.service = new GRPCRemoteClientService();
this.clientMap = new ConcurrentHashMap<>();
} }
public SerializableAndDeserialize get(Integer mappingId) { @Override public String path() {
return mapping.get(mappingId); return PATH;
}
@Override public void serverJoinNotify(String serverAddress) {
if (!clientMap.containsKey(serverAddress)) {
String host = serverAddress.split(":")[0];
int port = Integer.parseInt(serverAddress.split(":")[1]);
RemoteClient remoteClient = service.create(host, port);
clientMap.put(serverAddress, remoteClient);
}
}
@Override public void serverQuitNotify(String serverAddress) {
if (clientMap.containsKey(serverAddress)) {
clientMap.remove(serverAddress);
}
} }
} }
...@@ -16,35 +16,38 @@ ...@@ -16,35 +16,38 @@
* Project repository: https://github.com/OpenSkywalking/skywalking * Project repository: https://github.com/OpenSkywalking/skywalking
*/ */
package org.skywalking.apm.collector.remote.grpc.data.register; package org.skywalking.apm.collector.remote.grpc.service;
import com.google.protobuf.ByteString;
import org.skywalking.apm.collector.core.data.Data; import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
import org.skywalking.apm.collector.remote.grpc.data.GRPCRemoteData;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.remote.service.RemoteSerializeService;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class ApplicationRemoteData extends GRPCRemoteData { public class GRPCRemoteSerializeService implements RemoteSerializeService<RemoteData.Builder> {
@Override public RemoteDataMapping mapping() {
return RemoteDataMapping.Application;
}
@Override public RemoteData.Builder serialize(Data data) { @Override public RemoteData.Builder serialize(Data data) {
RemoteData.Builder builder = RemoteData.newBuilder(); RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(data.getDataString(0)); for (int i = 0; i < data.getDataStringsCount(); i++) {
builder.addDataStrings(data.getDataString(1)); builder.setDataStrings(i, data.getDataString(i));
builder.addDataIntegers(data.getDataInteger(0)); }
for (int i = 0; i < data.getDataIntegersCount(); i++) {
builder.setDataIntegers(i, data.getDataInteger(i));
}
for (int i = 0; i < data.getDataLongsCount(); i++) {
builder.setDataLongs(i, data.getDataLong(i));
}
for (int i = 0; i < data.getDataBooleansCount(); i++) {
builder.setDataBooleans(i, data.getDataBoolean(i));
}
for (int i = 0; i < data.getDataDoublesCount(); i++) {
builder.setDataDoubles(i, data.getDataDouble(i));
}
for (int i = 0; i < data.getDataBytesCount(); i++) {
builder.setDataBytes(i, ByteString.copyFrom(data.getDataBytes(i)));
}
return builder; return builder;
} }
@Override public Data deserialize(RemoteData remoteData) {
Data data = build(remoteData);
data.setDataString(0, remoteData.getDataStrings(0));
data.setDataString(1, remoteData.getDataStrings(1));
data.setDataInteger(0, remoteData.getDataIntegers(0));
return data;
}
} }
...@@ -16,22 +16,22 @@ ...@@ -16,22 +16,22 @@
* Project repository: https://github.com/OpenSkywalking/skywalking * Project repository: https://github.com/OpenSkywalking/skywalking
*/ */
package org.skywalking.apm.collector.stream.worker.base.selector; package org.skywalking.apm.collector.remote.grpc.service.selector;
import java.util.List; import java.util.List;
import org.skywalking.apm.collector.stream.worker.base.WorkerRef; import org.skywalking.apm.collector.remote.service.RemoteClient;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class ForeverFirstSelector implements WorkerSelector<WorkerRef> { public class ForeverFirstSelector implements RemoteClientSelector {
private final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class); private final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class);
@Override public WorkerRef select(List<WorkerRef> members, Object message) { @Override public RemoteClient select(List<RemoteClient> clients, Object message) {
logger.debug("member size: {}", members.size()); logger.debug("clients size: {}", clients.size());
return members.get(0); return clients.get(0);
} }
} }
...@@ -16,37 +16,23 @@ ...@@ -16,37 +16,23 @@
* Project repository: https://github.com/OpenSkywalking/skywalking * Project repository: https://github.com/OpenSkywalking/skywalking
*/ */
package org.skywalking.apm.collector.stream.worker.base.selector; package org.skywalking.apm.collector.remote.grpc.service.selector;
import java.util.List; import java.util.List;
import org.skywalking.apm.collector.core.data.AbstractHashMessage; import org.skywalking.apm.collector.core.data.AbstractHashMessage;
import org.skywalking.apm.collector.stream.worker.base.WorkerRef; import org.skywalking.apm.collector.remote.service.RemoteClient;
import org.skywalking.apm.collector.stream.worker.base.AbstractWorker;
/** /**
* The <code>HashCodeSelector</code> is a simple implementation of {@link WorkerSelector}. It choose {@link WorkerRef}
* by message {@link AbstractHashMessage} key's hashcode, so it can use to send the same hashcode message to same {@link
* WorkerRef}. Usually, use to database operate which avoid dirty data.
*
* @author peng-yongsheng * @author peng-yongsheng
* @since v3.0-2017
*/ */
public class HashCodeSelector implements WorkerSelector<WorkerRef> { public class HashCodeSelector implements RemoteClientSelector {
/** @Override public RemoteClient select(List<RemoteClient> clients, Object message) {
* Use message hashcode to select {@link WorkerRef}.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
@Override
public WorkerRef select(List<WorkerRef> members, Object message) {
if (message instanceof AbstractHashMessage) { if (message instanceof AbstractHashMessage) {
AbstractHashMessage hashMessage = (AbstractHashMessage)message; AbstractHashMessage hashMessage = (AbstractHashMessage)message;
int size = members.size(); int size = clients.size();
int selectIndex = Math.abs(hashMessage.getHashCode()) % size; int selectIndex = Math.abs(hashMessage.getHashCode()) % size;
return members.get(selectIndex); return clients.get(selectIndex);
} else { } else {
throw new IllegalArgumentException("the message send into HashCodeSelector must implementation of AbstractHashMessage, the message object class is: " + message.getClass().getName()); throw new IllegalArgumentException("the message send into HashCodeSelector must implementation of AbstractHashMessage, the message object class is: " + message.getClass().getName());
} }
......
...@@ -16,11 +16,14 @@ ...@@ -16,11 +16,14 @@
* Project repository: https://github.com/OpenSkywalking/skywalking * Project repository: https://github.com/OpenSkywalking/skywalking
*/ */
package org.skywalking.apm.collector.remote; package org.skywalking.apm.collector.remote.grpc.service.selector;
import java.util.List;
import org.skywalking.apm.collector.remote.service.RemoteClient;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public enum RemoteDataMapping { public interface RemoteClientSelector {
GlobalTrace, Segment, SegmentCost, InstPerformance, NodeComponent, NodeMapping, NodeReference, Application, Instance, ServiceName, ServiceEntry, ServiceReference, CpuMetric, MemoryMetric, MemoryPoolMetric, GCMetric RemoteClient select(List<RemoteClient> clients, Object message);
} }
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.remote.grpc.service.selector;
import java.util.List;
import org.skywalking.apm.collector.remote.service.RemoteClient;
/**
* @author peng-yongsheng
*/
public class RollingSelector implements RemoteClientSelector {
private int index = 0;
@Override public RemoteClient select(List<RemoteClient> clients, Object message) {
int size = clients.size();
index++;
int selectIndex = Math.abs(index) % size;
return clients.get(selectIndex);
}
}
...@@ -9,24 +9,18 @@ service RemoteCommonService { ...@@ -9,24 +9,18 @@ service RemoteCommonService {
} }
message RemoteMessage { message RemoteMessage {
string workerRole = 1; int32 graphId = 1;
RemoteData remoteData = 2; int32 nodeId = 2;
RemoteData remoteData = 3;
} }
message RemoteData { message RemoteData {
int32 mappingId = 1; repeated string dataStrings = 1;
int32 stringCapacity = 2; repeated int64 dataLongs = 2;
int32 longCapacity = 3; repeated double dataDoubles = 3;
int32 doubleCapacity = 4; repeated int32 dataIntegers = 4;
int32 integerCapacity = 5; repeated bytes dataBytes = 5;
int32 byteCapacity = 6; repeated bool dataBooleans = 6;
int32 booleanCapacity = 7;
repeated string dataStrings = 8;
repeated int64 dataLongs = 9;
repeated double dataDoubles = 10;
repeated int32 dataIntegers = 11;
repeated bytes dataBytes = 12;
repeated bool dataBooleans = 13;
} }
message Empty { message Empty {
......
...@@ -30,4 +30,11 @@ ...@@ -30,4 +30,11 @@
<artifactId>collector-remote-kafka-provider</artifactId> <artifactId>collector-remote-kafka-provider</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-remote-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project> </project>
\ No newline at end of file
...@@ -25,5 +25,5 @@ import org.skywalking.apm.collector.storage.base.dao.DAO; ...@@ -25,5 +25,5 @@ import org.skywalking.apm.collector.storage.base.dao.DAO;
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public interface DAOService extends Service { public interface DAOService extends Service {
DAO get(Class daoInterfaceClass); DAO get(Class<DAO> daoInterfaceClass);
} }
...@@ -29,12 +29,4 @@ ...@@ -29,12 +29,4 @@
<artifactId>collector-stream-define</artifactId> <artifactId>collector-stream-define</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>queue-component</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project> </project>
\ No newline at end of file
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.base.selector;
import java.util.List;
import org.skywalking.apm.collector.stream.worker.base.WorkerRef;
import org.skywalking.apm.collector.stream.worker.base.AbstractWorker;
/**
* The <code>RollingSelector</code> is a simple implementation of {@link WorkerSelector}.
* It choose {@link WorkerRef} nearly random, by round-robin.
*
* @author peng-yongsheng
* @since v3.0-2017
*/
public class RollingSelector implements WorkerSelector<WorkerRef> {
private int index = 0;
/**
* Use round-robin to select {@link WorkerRef}.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
@Override
public WorkerRef select(List<WorkerRef> members, Object message) {
int size = members.size();
index++;
int selectIndex = Math.abs(index) % size;
return members.get(selectIndex);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.base.selector;
import java.util.List;
import org.skywalking.apm.collector.stream.worker.base.WorkerRef;
import org.skywalking.apm.collector.stream.worker.base.AbstractWorker;
/**
* The <code>WorkerSelector</code> should be implemented by any class whose instances
* are intended to provide select a {@link WorkerRef} from a {@link WorkerRef} list.
* <p>
* Actually, the <code>WorkerRef</code> is designed to provide a routing ability in the collector cluster
*
* @author peng-yongsheng
* @since v3.0-2017
*/
public interface WorkerSelector<T extends WorkerRef> {
/**
* select a {@link WorkerRef} from a {@link WorkerRef} list.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
T select(List<T> members, Object message);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册