提交 38aeb92e 编写于 作者: P pengys5

collector compile successful

上级 8b22a52d
......@@ -10,8 +10,8 @@ import org.skywalking.apm.collector.core.util.StringUtils;
*/
public class AgentStreamGRPCConfigParser implements ModuleConfigParser {
private final String HOST = "host";
private final String PORT = "port";
private static final String HOST = "host";
private static final String PORT = "port";
@Override public void parse(Map config) throws ConfigParseException {
AgentStreamGRPCConfig.HOST = (String)config.get(HOST);
......
package org.skywalking.apm.collector.agent.stream.server.grpc.impl;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.AbstractWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerRef;
import org.skywalking.apm.collector.worker.grpcserver.WorkerCaller;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc;
import org.skywalking.apm.network.proto.UpstreamSegment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class TraceSegmentServiceImpl extends TraceSegmentServiceGrpc.TraceSegmentServiceImplBase implements WorkerCaller {
private final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceImpl.class);
private ClusterWorkerContext clusterWorkerContext;
private WorkerRef segmentReceiverWorkRef;
@Override public void preStart() throws ProviderNotFoundException {
segmentReceiverWorkRef = clusterWorkerContext.findProvider(SegmentReceiver.WorkerRole.INSTANCE).create(AbstractWorker.noOwner());
}
@Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
return new StreamObserver<UpstreamSegment>() {
@Override public void onNext(UpstreamSegment segment) {
if (logger.isDebugEnabled()) {
StringBuffer globalTraceIds = new StringBuffer();
logger.debug("global trace ids count: %s", segment.getGlobalTraceIdsList().size());
segment.getGlobalTraceIdsList().forEach(globalTraceId -> {
globalTraceIds.append(globalTraceId).append(",");
});
logger.debug("receive segment, global trace ids: %s, segment byte size: %s", globalTraceIds, segment.getSegment().size());
try {
segmentReceiverWorkRef.tell(segment);
} catch (WorkerInvokeException e) {
onError(e);
}
}
}
@Override public void onError(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
@Override public void onCompleted() {
responseObserver.onCompleted();
}
};
}
@Override public void inject(ClusterWorkerContext clusterWorkerContext) {
this.clusterWorkerContext = clusterWorkerContext;
}
}
......@@ -15,7 +15,7 @@
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-cluster-new</artifactId>
<artifactId>apm-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
......
......@@ -18,5 +18,20 @@
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.196</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -6,12 +6,16 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.skywalking.apm.collector.core.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class H2Client implements Client {
private final Logger logger = LoggerFactory.getLogger(H2Client.class);
private Connection conn;
@Override public void initialize() throws H2ClientException {
......@@ -40,7 +44,7 @@ public class H2Client implements Client {
statement = conn.createStatement();
ResultSet rs = statement.executeQuery(sql);
while (rs.next()) {
System.out.println(rs.getString("ADDRESS") + "," + rs.getString("DATA"));
logger.debug(rs.getString("ADDRESS") + "," + rs.getString("DATA"));
}
statement.closeOnCompletion();
} catch (SQLException e) {
......
......@@ -10,12 +10,12 @@ import org.skywalking.apm.collector.core.util.StringUtils;
*/
public class ClusterRedisConfigParser implements ModuleConfigParser {
private final String HOST = "host";
private final String PORT = "port";
private static final String HOST = "host";
private static final String PORT = "port";
@Override public void parse(Map config) throws ConfigParseException {
ClusterRedisConfig.HOST = (String)config.get(HOST);
ClusterRedisConfig.PORT = ((Integer)config.get(PORT));
ClusterRedisConfig.PORT = (Integer)config.get(PORT);
if (StringUtils.isEmpty(ClusterRedisConfig.HOST) || ClusterRedisConfig.PORT == 0) {
throw new ConfigParseException("");
}
......
......@@ -3,6 +3,7 @@ package org.skywalking.apm.collector.cluster.redis;
import org.skywalking.apm.collector.client.redis.RedisClient;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
......@@ -40,4 +41,8 @@ public class ClusterRedisModuleDefine extends ClusterModuleDefine {
@Override protected ClusterModuleRegistrationWriter registrationWriter() {
return new ClusterRedisModuleRegistrationWriter(getClient());
}
@Override protected ClusterModuleRegistrationReader registrationReader() {
return null;
}
}
......@@ -3,6 +3,7 @@ package org.skywalking.apm.collector.cluster.standalone;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
......@@ -40,4 +41,8 @@ public class ClusterStandaloneModuleDefine extends ClusterModuleDefine {
@Override protected ClusterModuleRegistrationWriter registrationWriter() {
return new ClusterStandaloneModuleRegistrationWriter(getClient());
}
@Override protected ClusterModuleRegistrationReader registrationReader() {
return null;
}
}
......@@ -10,8 +10,8 @@ import org.skywalking.apm.collector.core.util.StringUtils;
*/
public class ClusterZKConfigParser implements ModuleConfigParser {
private final String HOST_PORT = "hostPort";
private final String SESSION_TIMEOUT = "sessionTimeout";
private static final String HOST_PORT = "hostPort";
private static final String SESSION_TIMEOUT = "sessionTimeout";
@Override public void parse(Map config) throws ConfigParseException {
ClusterZKConfig.HOST_PORT = (String)config.get(HOST_PORT);
......
......@@ -40,10 +40,10 @@ public class ClusterZKDataInitializer extends ClusterDataInitializer {
pathBuilder.append("/").append(catalog);
}
if (zkClient.exists(pathBuilder.toString(), false) == null) {
return false;
} else {
return true;
}
// if (zkClient.exists(pathBuilder.toString(), false) == null) {
// return false;
// } else {
return true;
// }
}
}
......@@ -4,6 +4,7 @@ import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleGroup;
......@@ -40,4 +41,8 @@ public class ClusterZKModuleDefine extends ClusterModuleDefine {
@Override protected ClusterModuleRegistrationWriter registrationWriter() {
return new ClusterZKModuleRegistrationWriter(getClient());
}
@Override protected ClusterModuleRegistrationReader registrationReader() {
return null;
}
}
......@@ -24,7 +24,7 @@ public abstract class AgentStreamModuleDefine extends ModuleDefine {
server.initialize();
String key = ClusterDataInitializer.BASE_CATALOG + "." + name();
ClusterModuleContext.writer.write(key, registration().buildValue());
ClusterModuleContext.WRITER.write(key, registration().buildValue());
} catch (ConfigParseException | ServerException e) {
throw new AgentStreamModuleException(e.getMessage(), e);
}
......
......@@ -5,7 +5,7 @@ import org.skywalking.apm.collector.core.module.ModuleException;
/**
* @author pengys5
*/
public class AgentStreamModuleException extends ModuleException{
public class AgentStreamModuleException extends ModuleException {
public AgentStreamModuleException(String message) {
super(message);
}
......
......@@ -4,7 +4,7 @@ package org.skywalking.apm.collector.core.cluster;
* @author pengys5
*/
public class ClusterModuleContext {
public static ClusterModuleRegistrationWriter writer;
public static ClusterModuleRegistrationWriter WRITER;
public static ClusterModuleRegistrationReader reader;
public static ClusterModuleRegistrationReader READER;
}
......@@ -38,6 +38,6 @@ public class ClusterModuleInstaller implements ModuleInstaller {
moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey());
moduleDefine.initialize(clusterConfigEntry.getValue());
}
ClusterModuleContext.writer = ((ClusterModuleDefine)moduleDefine).registrationWriter();
ClusterModuleContext.WRITER = ((ClusterModuleDefine)moduleDefine).registrationWriter();
}
}
......@@ -5,7 +5,7 @@ package org.skywalking.apm.collector.core.framework;
*/
public abstract class DefinitionFile {
private final String CATALOG = "META-INF/defines/";
private static final String CATALOG = "META-INF/defines/";
protected abstract String fileName();
......
package org.skywalking.apm.collector.core.module;
import java.util.Map;
import org.skywalking.apm.collector.core.config.Config;
import org.skywalking.apm.collector.core.config.ConfigParseException;
/**
......
......@@ -4,7 +4,6 @@ import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterModuleInstaller;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.worker.WorkerModuleInstaller;
/**
* @author pengys5
......@@ -16,8 +15,6 @@ public class ModuleInstallerAdapter implements ModuleInstaller {
public ModuleInstallerAdapter(ModuleGroup moduleGroup) {
if (ModuleGroup.Cluster.equals(moduleGroup)) {
moduleInstaller = new ClusterModuleInstaller();
} else if (ModuleGroup.Worker.equals(moduleGroup)) {
moduleInstaller = new WorkerModuleInstaller();
}
}
......
......@@ -4,5 +4,5 @@ package org.skywalking.apm.collector.core.queue;
* @author pengys5
*/
public class QueueModuleContext {
public static Creator creator;
public static Creator CREATOR;
}
......@@ -18,7 +18,7 @@ public class BytesUtils {
long num = 0;
for (int ix = 0; ix < 8; ++ix) {
num <<= 8;
num |= (byteNum[ix] & 0xff);
num |= byteNum[ix] & 0xff;
}
return num;
}
......
package org.skywalking.apm.collector.core.util;
import com.sun.istack.internal.Nullable;
import java.util.Map;
/**
......@@ -8,7 +7,7 @@ import java.util.Map;
*/
public class CollectionUtils {
public static boolean isEmpty(@Nullable Map map) {
return (map == null || map.size() == 0);
public static boolean isEmpty(Map map) {
return map == null || map.size() == 0;
}
}
package org.skywalking.apm.collector.core.util;
import com.sun.istack.internal.Nullable;
/**
* @author pengys5
*/
public class ObjectUtils {
public static boolean isEmpty(@Nullable Object obj) {
public static boolean isEmpty(Object obj) {
return obj == null;
}
}
package org.skywalking.apm.collector.core.util;
import com.sun.istack.internal.Nullable;
/**
* @author pengys5
*/
......@@ -9,7 +7,7 @@ public class StringUtils {
public static final String EMPTY_STRING = "";
public static boolean isEmpty(@Nullable Object str) {
return (str == null || EMPTY_STRING.equals(str));
public static boolean isEmpty(Object str) {
return str == null || EMPTY_STRING.equals(str);
}
}
......@@ -15,7 +15,7 @@ public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAs
T localAsyncWorker = workerInstance(getClusterContext());
localAsyncWorker.preStart();
QueueEventHandler queueEventHandler = QueueModuleContext.creator.create(queueSize(), localAsyncWorker);
QueueEventHandler queueEventHandler = QueueModuleContext.CREATOR.create(queueSize(), localAsyncWorker);
LocalAsyncWorkerRef workerRef = new LocalAsyncWorkerRef(role(), queueEventHandler);
......
package org.skywalking.apm.collector.core.worker.selector;
import org.skywalking.apm.collector.actor.AbstractWorker;
import org.skywalking.apm.collector.actor.WorkerRef;
import java.util.List;
import org.skywalking.apm.collector.core.worker.WorkerRef;
/**
* The <code>HashCodeSelector</code> is a simple implementation of {@link WorkerSelector}. It choose {@link WorkerRef}
......@@ -19,7 +17,7 @@ public class HashCodeSelector implements WorkerSelector<WorkerRef> {
* Use message hashcode to select {@link WorkerRef}.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message the {@link AbstractWorker} is going to send.
* @param message the {@link org.skywalking.apm.collector.core.worker.AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
@Override
......
package org.skywalking.apm.collector.core.worker.selector;
import org.skywalking.apm.collector.actor.AbstractWorker;
import org.skywalking.apm.collector.actor.WorkerRef;
import java.util.List;
import org.skywalking.apm.collector.core.worker.WorkerRef;
/**
* The <code>RollingSelector</code> is a simple implementation of {@link WorkerSelector}.
......@@ -20,7 +18,7 @@ public class RollingSelector implements WorkerSelector<WorkerRef> {
* Use round-robin to select {@link WorkerRef}.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message message the {@link AbstractWorker} is going to send.
* @param message message the {@link org.skywalking.apm.collector.core.worker.AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
@Override
......
......@@ -25,6 +25,6 @@ public class QueueDataCarrierModuleDefine extends QueueModuleDefine {
}
@Override public final void initialize(Map config) throws DefineException, ClientException {
QueueModuleContext.creator = new DataCarrierCreator();
QueueModuleContext.CREATOR = new DataCarrierCreator();
}
}
......@@ -25,6 +25,6 @@ public class QueueDisruptorModuleDefine extends QueueModuleDefine {
}
@Override public final void initialize(Map config) throws DefineException, ClientException {
QueueModuleContext.creator = new DisruptorCreator();
QueueModuleContext.CREATOR = new DisruptorCreator();
}
}
......@@ -19,16 +19,16 @@
</properties>
<dependencies>
<!--<dependency>-->
<!--<groupId>org.skywalking</groupId>-->
<!--<artifactId>apm-collector-core</artifactId>-->
<!--<version>${project.version}</version>-->
<!--</dependency>-->
<!--<dependency>-->
<!--<groupId>org.skywalking</groupId>-->
<!--<artifactId>apm-collector-server</artifactId>-->
<!--<version>${project.version}</version>-->
<!--</dependency>-->
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
......
......@@ -7,7 +7,6 @@
<module>apm-collector-core</module>
<module>apm-collector-queue</module>
<module>apm-collector-storage</module>
<module>apm-collector-cluster</module>
<module>apm-collector-client</module>
<module>apm-collector-server</module>
<module>apm-collector-discovery</module>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册