提交 243c0007 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge pull request #344 from wu-sheng/feature/338

Feature/338
package org.skywalking.apm.collector.agentregister.application;
import org.skywalking.apm.collector.agentstream.worker.cache.ApplicationCache;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker;
import org.skywalking.apm.collector.agentstream.worker.register.application.dao.IApplicationDAO;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
......@@ -20,8 +19,7 @@ public class ApplicationIDService {
private final Logger logger = LoggerFactory.getLogger(ApplicationIDService.class);
public int getOrCreate(String applicationCode) {
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
int applicationId = dao.getApplicationId(applicationCode);
int applicationId = ApplicationCache.get(applicationCode);
if (applicationId == 0) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
......
package org.skywalking.apm.collector.agentregister.jetty;
/**
* @author pengys5
*/
public class AgentRegisterJettyConfig {
public static String HOST;
public static int PORT;
public static String CONTEXT_PATH;
}
package org.skywalking.apm.collector.agentregister.jetty;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class AgentRegisterJettyConfigParser implements ModuleConfigParser {
private static final String HOST = "host";
private static final String PORT = "port";
public static final String CONTEXT_PATH = "contextPath";
@Override public void parse(Map config) throws ConfigParseException {
AgentRegisterJettyConfig.CONTEXT_PATH = "/";
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) {
AgentRegisterJettyConfig.HOST = "localhost";
} else {
AgentRegisterJettyConfig.HOST = (String)config.get(HOST);
}
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) {
AgentRegisterJettyConfig.PORT = 12800;
} else {
AgentRegisterJettyConfig.PORT = (Integer)config.get(PORT);
}
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(CONTEXT_PATH))) {
AgentRegisterJettyConfig.CONTEXT_PATH = (String)config.get(CONTEXT_PATH);
}
}
}
package org.skywalking.apm.collector.agentregister.jetty;
import org.skywalking.apm.collector.agentregister.AgentRegisterModuleGroupDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
/**
* @author pengys5
*/
public class AgentRegisterJettyDataListener extends ClusterDataListener {
public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + AgentRegisterModuleGroupDefine.GROUP_NAME + "." + AgentRegisterJettyModuleDefine.MODULE_NAME;
@Override public String path() {
return PATH;
}
@Override public void addressChangedNotify() {
}
}
package org.skywalking.apm.collector.agentregister.jetty;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.agentregister.AgentRegisterModuleDefine;
import org.skywalking.apm.collector.agentregister.AgentRegisterModuleGroupDefine;
import org.skywalking.apm.collector.agentregister.jetty.handler.ApplicationRegisterServletHandler;
import org.skywalking.apm.collector.agentregister.jetty.handler.InstanceDiscoveryServletHandler;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.server.jetty.JettyServer;
/**
* @author pengys5
*/
public class AgentRegisterJettyModuleDefine extends AgentRegisterModuleDefine {
public static final String MODULE_NAME = "jetty";
@Override protected String group() {
return AgentRegisterModuleGroupDefine.GROUP_NAME;
}
@Override public String name() {
return MODULE_NAME;
}
@Override protected ModuleConfigParser configParser() {
return new AgentRegisterJettyConfigParser();
}
@Override protected Server server() {
return new JettyServer(AgentRegisterJettyConfig.HOST, AgentRegisterJettyConfig.PORT, AgentRegisterJettyConfig.CONTEXT_PATH);
}
@Override protected ModuleRegistration registration() {
return new AgentRegisterJettyModuleRegistration();
}
@Override public ClusterDataListener listener() {
return new AgentRegisterJettyDataListener();
}
@Override public List<Handler> handlerList() {
List<Handler> handlers = new LinkedList<>();
handlers.add(new ApplicationRegisterServletHandler());
handlers.add(new InstanceDiscoveryServletHandler());
return handlers;
}
}
package org.skywalking.apm.collector.agentregister.jetty;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public class AgentRegisterJettyModuleRegistration extends ModuleRegistration {
@Override public Value buildValue() {
return new Value(AgentRegisterJettyConfig.HOST, AgentRegisterJettyConfig.PORT, AgentRegisterJettyConfig.CONTEXT_PATH);
}
}
package org.skywalking.apm.collector.agentregister.jetty.handler;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agentregister.application.ApplicationIDService;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ApplicationRegisterServletHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServletHandler.class);
private ApplicationIDService applicationIDService = new ApplicationIDService();
private Gson gson = new Gson();
private static final String APPLICATION_CODE = "c";
private static final String APPLICATION_ID = "i";
@Override public String pathSpec() {
return "/application/register";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
JsonArray responseArray = new JsonArray();
try {
JsonArray applicationCodes = gson.fromJson(req.getReader(), JsonArray.class);
for (int i = 0; i < applicationCodes.size(); i++) {
String applicationCode = applicationCodes.get(i).getAsString();
int applicationId = applicationIDService.getOrCreate(applicationCode);
JsonObject mapping = new JsonObject();
mapping.addProperty(APPLICATION_CODE, applicationCode);
mapping.addProperty(APPLICATION_ID, applicationId);
responseArray.add(mapping);
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
return responseArray;
}
}
package org.skywalking.apm.collector.agentregister.jetty.handler;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agentregister.instance.InstanceIDService;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class InstanceDiscoveryServletHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServletHandler.class);
private InstanceIDService instanceIDService = new InstanceIDService();
private Gson gson = new Gson();
private static final String APPLICATION_ID = "ai";
private static final String AGENT_UUID = "au";
private static final String REGISTER_TIME = "rt";
private static final String INSTANCE_ID = "ii";
@Override public String pathSpec() {
return "/instance/register";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
JsonObject responseJson = new JsonObject();
try {
JsonObject instance = gson.fromJson(req.getReader(), JsonObject.class);
int applicationId = instance.get(APPLICATION_ID).getAsInt();
String agentUUID = instance.get(AGENT_UUID).getAsString();
long registerTime = instance.get(REGISTER_TIME).getAsLong();
int instanceId = instanceIDService.getOrCreate(applicationId, agentUUID, registerTime);
responseJson.addProperty(APPLICATION_ID, applicationId);
responseJson.addProperty(INSTANCE_ID, instanceId);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
return responseJson;
}
}
org.skywalking.apm.collector.agentregister.grpc.AgentRegisterGRPCModuleDefine
\ No newline at end of file
org.skywalking.apm.collector.agentregister.grpc.AgentRegisterGRPCModuleDefine
org.skywalking.apm.collector.agentregister.jetty.AgentRegisterJettyModuleDefine
\ No newline at end of file
......@@ -25,9 +25,11 @@ public class AgentStreamGRPCServerHandler extends JettyHandler {
ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader();
List<String> servers = reader.read(AgentStreamGRPCDataListener.PATH);
JsonArray serverArray = new JsonArray();
servers.forEach(server -> {
serverArray.add(server);
});
servers.forEach(server -> serverArray.add(server));
return serverArray;
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
}
......@@ -30,4 +30,8 @@ public class AgentStreamJettyServerHandler extends JettyHandler {
});
return serverArray;
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
}
......@@ -2,7 +2,6 @@ package org.skywalking.apm.collector.agentstream;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.storage.IDNameExchangeTimer;
import org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
......@@ -36,6 +35,5 @@ public class AgentStreamModuleInstaller implements ModuleInstaller {
}
new PersistenceTimer().start();
new IDNameExchangeTimer().start();
}
}
package org.skywalking.apm.collector.agentstream.jetty;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleDefine;
import org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine;
import org.skywalking.apm.collector.agentstream.jetty.handler.TraceSegmentServletHandler;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
......@@ -42,6 +44,8 @@ public class AgentStreamJettyModuleDefine extends AgentStreamModuleDefine {
}
@Override public List<Handler> handlerList() {
return null;
List<Handler> handlers = new LinkedList<>();
handlers.add(new TraceSegmentServletHandler());
return handlers;
}
}
package org.skywalking.apm.collector.agentstream.jetty.handler;
import com.google.gson.JsonElement;
import com.google.gson.stream.JsonReader;
import java.io.BufferedReader;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agentstream.jetty.handler.reader.TraceSegment;
import org.skywalking.apm.collector.agentstream.jetty.handler.reader.TraceSegmentJsonReader;
import org.skywalking.apm.collector.agentstream.worker.segment.SegmentParse;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class TraceSegmentServletHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(TraceSegmentServletHandler.class);
@Override public String pathSpec() {
return "/segments";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
logger.debug("receive stream segment");
try {
BufferedReader bufferedReader = req.getReader();
read(bufferedReader);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
return null;
}
private TraceSegmentJsonReader jsonReader = new TraceSegmentJsonReader();
private void read(BufferedReader bufferedReader) throws IOException {
JsonReader reader = new JsonReader(bufferedReader);
reader.beginArray();
while (reader.hasNext()) {
SegmentParse segmentParse = new SegmentParse();
TraceSegment traceSegment = jsonReader.read(reader);
segmentParse.parse(traceSegment.getGlobalTraceIds(), traceSegment.getTraceSegmentObject());
}
reader.endArray();
}
}
package org.skywalking.apm.collector.agentstream.jetty.handler.reader;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import org.skywalking.apm.network.proto.KeyWithStringValue;
/**
* @author pengys5
*/
public class KeyWithStringValueJsonReader implements StreamJsonReader<KeyWithStringValue> {
private static final String KEY = "k";
private static final String VALUE = "v";
@Override public KeyWithStringValue read(JsonReader reader) throws IOException {
KeyWithStringValue.Builder builder = KeyWithStringValue.newBuilder();
reader.beginObject();
while (reader.hasNext()) {
switch (reader.nextName()) {
case KEY:
builder.setKey(reader.nextString());
break;
case VALUE:
builder.setValue(reader.nextString());
break;
default:
reader.skipValue();
break;
}
}
reader.endObject();
return builder.build();
}
}
package org.skywalking.apm.collector.agentstream.jetty.handler.reader;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import org.skywalking.apm.network.proto.LogMessage;
/**
* @author pengys5
*/
public class LogJsonReader implements StreamJsonReader<LogMessage> {
private KeyWithStringValueJsonReader keyWithStringValueJsonReader = new KeyWithStringValueJsonReader();
private static final String TIME = "ti";
private static final String LOG_DATA = "ld";
@Override public LogMessage read(JsonReader reader) throws IOException {
LogMessage.Builder builder = LogMessage.newBuilder();
while (reader.hasNext()) {
switch (reader.nextName()) {
case TIME:
builder.setTime(reader.nextLong());
case LOG_DATA:
reader.beginArray();
while (reader.hasNext()) {
builder.addData(keyWithStringValueJsonReader.read(reader));
}
reader.endArray();
default:
reader.skipValue();
}
}
return builder.build();
}
}
package org.skywalking.apm.collector.agentstream.jetty.handler.reader;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import org.skywalking.apm.network.proto.TraceSegmentReference;
/**
* @author pengys5
*/
public class ReferenceJsonReader implements StreamJsonReader<TraceSegmentReference> {
private UniqueIdJsonReader uniqueIdJsonReader = new UniqueIdJsonReader();
private static final String PARENT_TRACE_SEGMENT_ID = "ts";
private static final String PARENT_APPLICATION_ID = "ai";
private static final String PARENT_SPAN_ID = "si";
private static final String PARENT_SERVICE_ID = "vi";
private static final String PARENT_SERVICE_NAME = "vn";
private static final String NETWORK_ADDRESS_ID = "ni";
private static final String NETWORK_ADDRESS = "nn";
private static final String ENTRY_APPLICATION_INSTANCE_ID = "ea";
private static final String ENTRY_SERVICE_ID = "ei";
private static final String ENTRY_SERVICE_NAME = "en";
private static final String REF_TYPE_VALUE = "rv";
@Override public TraceSegmentReference read(JsonReader reader) throws IOException {
TraceSegmentReference.Builder builder = TraceSegmentReference.newBuilder();
reader.beginObject();
while (reader.hasNext()) {
switch (reader.nextName()) {
case PARENT_TRACE_SEGMENT_ID:
builder.setParentTraceSegmentId(uniqueIdJsonReader.read(reader));
break;
case PARENT_APPLICATION_ID:
builder.setParentApplicationInstanceId(reader.nextInt());
break;
case PARENT_SPAN_ID:
builder.setParentSpanId(reader.nextInt());
break;
case PARENT_SERVICE_ID:
builder.setParentServiceId(reader.nextInt());
break;
case PARENT_SERVICE_NAME:
builder.setParentServiceName(reader.nextString());
break;
case NETWORK_ADDRESS_ID:
builder.setNetworkAddressId(reader.nextInt());
break;
case NETWORK_ADDRESS:
builder.setNetworkAddress(reader.nextString());
break;
case ENTRY_APPLICATION_INSTANCE_ID:
builder.setEntryApplicationInstanceId(reader.nextInt());
break;
case ENTRY_SERVICE_ID:
builder.setEntryServiceId(reader.nextInt());
break;
case ENTRY_SERVICE_NAME:
builder.setEntryServiceName(reader.nextString());
break;
case REF_TYPE_VALUE:
builder.setRefTypeValue(reader.nextInt());
break;
default:
reader.skipValue();
break;
}
}
reader.endObject();
return builder.build();
}
}
package org.skywalking.apm.collector.agentstream.jetty.handler.reader;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SegmentJsonReader implements StreamJsonReader<TraceSegmentObject> {
private final Logger logger = LoggerFactory.getLogger(SegmentJsonReader.class);
private UniqueIdJsonReader uniqueIdJsonReader = new UniqueIdJsonReader();
private ReferenceJsonReader referenceJsonReader = new ReferenceJsonReader();
private SpanJsonReader spanJsonReader = new SpanJsonReader();
private static final String TRACE_SEGMENT_ID = "ts";
private static final String APPLICATION_ID = "ai";
private static final String APPLICATION_INSTANCE_ID = "ii";
private static final String TRACE_SEGMENT_REFERENCE = "rs";
private static final String SPANS = "ss";
@Override public TraceSegmentObject read(JsonReader reader) throws IOException {
TraceSegmentObject.Builder builder = TraceSegmentObject.newBuilder();
reader.beginObject();
while (reader.hasNext()) {
switch (reader.nextName()) {
case TRACE_SEGMENT_ID:
builder.setTraceSegmentId(uniqueIdJsonReader.read(reader));
if (logger.isDebugEnabled()) {
StringBuilder segmentId = new StringBuilder();
builder.getTraceSegmentId().getIdPartsList().forEach(idPart -> segmentId.append(idPart));
logger.debug("segment id: {}", segmentId);
}
break;
case APPLICATION_ID:
builder.setApplicationId(reader.nextInt());
break;
case APPLICATION_INSTANCE_ID:
builder.setApplicationInstanceId(reader.nextInt());
break;
case TRACE_SEGMENT_REFERENCE:
reader.beginArray();
while (reader.hasNext()) {
builder.addRefs(referenceJsonReader.read(reader));
}
reader.endArray();
break;
case SPANS:
reader.beginArray();
while (reader.hasNext()) {
builder.addSpans(spanJsonReader.read(reader));
}
reader.endArray();
break;
default:
reader.skipValue();
break;
}
}
reader.endObject();
return builder.build();
}
}
package org.skywalking.apm.collector.agentstream.jetty.handler.reader;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import org.skywalking.apm.network.proto.SpanObject;
/**
* @author pengys5
*/
public class SpanJsonReader implements StreamJsonReader<SpanObject> {
private KeyWithStringValueJsonReader keyWithStringValueJsonReader = new KeyWithStringValueJsonReader();
private LogJsonReader logJsonReader = new LogJsonReader();
private static final String SPAN_ID = "si";
private static final String SPAN_TYPE_VALUE = "tv";
private static final String SPAN_LAYER_VALUE = "lv";
private static final String PARENT_SPAN_ID = "ps";
private static final String START_TIME = "st";
private static final String END_TIME = "et";
private static final String COMPONENT_ID = "ci";
private static final String COMPONENT_NAME = "cn";
private static final String OPERATION_NAME_ID = "oi";
private static final String OPERATION_NAME = "on";
private static final String PEER_ID = "pi";
private static final String PEER = "pn";
private static final String IS_ERROR = "ie";
private static final String TAGS = "to";
private static final String LOGS = "lo";
@Override public SpanObject read(JsonReader reader) throws IOException {
SpanObject.Builder builder = SpanObject.newBuilder();
reader.beginObject();
while (reader.hasNext()) {
switch (reader.nextName()) {
case SPAN_ID:
builder.setSpanId(reader.nextInt());
break;
case SPAN_TYPE_VALUE:
builder.setSpanTypeValue(reader.nextInt());
break;
case SPAN_LAYER_VALUE:
builder.setSpanLayerValue(reader.nextInt());
break;
case PARENT_SPAN_ID:
builder.setParentSpanId(reader.nextInt());
break;
case START_TIME:
builder.setStartTime(reader.nextLong());
break;
case END_TIME:
builder.setEndTime(reader.nextLong());
break;
case COMPONENT_ID:
builder.setComponentId(reader.nextInt());
break;
case COMPONENT_NAME:
builder.setComponent(reader.nextString());
break;
case OPERATION_NAME_ID:
builder.setOperationNameId(reader.nextInt());
break;
case OPERATION_NAME:
builder.setOperationName(reader.nextString());
break;
case PEER_ID:
builder.setPeerId(reader.nextInt());
break;
case PEER:
builder.setPeer(reader.nextString());
break;
case IS_ERROR:
builder.setIsError(reader.nextBoolean());
break;
case TAGS:
reader.beginArray();
while (reader.hasNext()) {
builder.addTags(keyWithStringValueJsonReader.read(reader));
}
reader.endArray();
break;
case LOGS:
reader.beginArray();
while (reader.hasNext()) {
builder.addLogs(logJsonReader.read(reader));
}
reader.endArray();
break;
default:
reader.skipValue();
break;
}
}
reader.endObject();
return builder.build();
}
}
package org.skywalking.apm.collector.agentstream.jetty.handler.reader;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
/**
* @author pengys5
*/
public interface StreamJsonReader<T> {
T read(JsonReader reader) throws IOException;
}
package org.skywalking.apm.collector.agentstream.jetty.handler.reader;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.UniqueId;
/**
* @author pengys5
*/
public class TraceSegment {
private List<UniqueId> uniqueIds;
private TraceSegmentObject traceSegmentObject;
public TraceSegment() {
uniqueIds = new ArrayList<>();
}
public List<UniqueId> getGlobalTraceIds() {
return uniqueIds;
}
public void addGlobalTraceId(UniqueId globalTraceId) {
uniqueIds.add(globalTraceId);
}
public TraceSegmentObject getTraceSegmentObject() {
return traceSegmentObject;
}
public void setTraceSegmentObject(TraceSegmentObject traceSegmentObject) {
this.traceSegmentObject = traceSegmentObject;
}
}
package org.skywalking.apm.collector.agentstream.jetty.handler.reader;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class TraceSegmentJsonReader implements StreamJsonReader<TraceSegment> {
private final Logger logger = LoggerFactory.getLogger(TraceSegmentJsonReader.class);
private UniqueIdJsonReader uniqueIdJsonReader = new UniqueIdJsonReader();
private SegmentJsonReader segmentJsonReader = new SegmentJsonReader();
private static final String GLOBAL_TRACE_IDS = "gt";
private static final String SEGMENT = "sg";
@Override public TraceSegment read(JsonReader reader) throws IOException {
TraceSegment traceSegment = new TraceSegment();
reader.beginObject();
while (reader.hasNext()) {
switch (reader.nextName()) {
case GLOBAL_TRACE_IDS:
reader.beginArray();
while (reader.hasNext()) {
traceSegment.addGlobalTraceId(uniqueIdJsonReader.read(reader));
}
reader.endArray();
if (logger.isDebugEnabled()) {
traceSegment.getGlobalTraceIds().forEach(uniqueId -> {
StringBuilder globalTraceId = new StringBuilder();
uniqueId.getIdPartsList().forEach(idPart -> globalTraceId.append(idPart));
logger.debug("global trace id: {}", globalTraceId.toString());
});
}
break;
case SEGMENT:
traceSegment.setTraceSegmentObject(segmentJsonReader.read(reader));
break;
default:
reader.skipValue();
break;
}
}
reader.endObject();
return traceSegment;
}
}
package org.skywalking.apm.collector.agentstream.jetty.handler.reader;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import org.skywalking.apm.network.proto.UniqueId;
/**
* @author pengys5
*/
public class UniqueIdJsonReader implements StreamJsonReader<UniqueId> {
@Override public UniqueId read(JsonReader reader) throws IOException {
UniqueId.Builder builder = UniqueId.newBuilder();
reader.beginArray();
while (reader.hasNext()) {
builder.addIdParts(reader.nextLong());
}
reader.endArray();
return builder.build();
}
}
......@@ -7,6 +7,5 @@ public class CommonTable {
public static final String TABLE_TYPE = "type";
public static final String COLUMN_ID = "id";
public static final String COLUMN_AGG = "agg";
public static final String COLUMN_EXCHANGE_TIMES = "exchange_times";
public static final String COLUMN_TIME_BUCKET = "time_bucket";
}
......@@ -8,6 +8,7 @@ public class Const {
public static final String IDS_SPLIT = "\\.\\.-\\.\\.";
public static final String PEERS_FRONT_SPLIT = "[";
public static final String PEERS_BEHIND_SPLIT = "]";
public static final int USER_ID = 1;
public static final String USER_CODE = "User";
public static final String SEGMENT_SPAN_SPLIT = "S";
}
package org.skywalking.apm.collector.agentstream.worker.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.agentstream.worker.register.application.dao.IApplicationDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
/**
* @author pengys5
*/
public class ApplicationCache {
private static Cache<String, Integer> CACHE = CacheBuilder.newBuilder().maximumSize(1000).build();
public static int get(String applicationCode) {
try {
return CACHE.get(applicationCode, () -> {
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
return dao.getApplicationId(applicationCode);
});
} catch (Throwable e) {
return 0;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.agentstream.worker.register.instance.dao.IInstanceDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
/**
* @author pengys5
*/
public class InstanceCache {
private static Cache<Integer, Integer> CACHE = CacheBuilder.newBuilder().maximumSize(1000).build();
public static int get(int applicationInstanceId) {
try {
return CACHE.get(applicationInstanceId, () -> {
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
return dao.getApplicationId(applicationInstanceId);
});
} catch (Throwable e) {
return 0;
}
}
}
......@@ -3,21 +3,21 @@ package org.skywalking.apm.collector.agentstream.worker.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.node.component.dao.INodeComponentDAO;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
/**
* @author pengys5
*/
public class ComponentCache {
public class ServiceNameCache {
private static Cache<String, Integer> CACHE = CacheBuilder.newBuilder().maximumSize(1000).build();
private static Cache<String, Integer> CACHE = CacheBuilder.newBuilder().maximumSize(2000).build();
public static int get(int applicationId, String componentName) {
public static int get(int applicationId, String serviceName) {
try {
return CACHE.get(applicationId + Const.ID_SPLIT + componentName, () -> {
INodeComponentDAO dao = (INodeComponentDAO)DAOContainer.INSTANCE.get(INodeComponentDAO.class.getName());
return dao.getComponentId(applicationId, componentName);
return CACHE.get(applicationId + Const.ID_SPLIT + serviceName, () -> {
IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName());
return dao.getServiceId(applicationId, serviceName);
});
} catch (Throwable e) {
return 0;
......
package org.skywalking.apm.collector.agentstream.worker.global;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.global.dao.IGlobalTraceDAO;
import org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......@@ -10,7 +8,7 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......@@ -28,9 +26,12 @@ public class GlobalTracePersistenceWorker extends PersistenceWorker {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
IGlobalTraceDAO dao = (IGlobalTraceDAO)DAOContainer.INSTANCE.get(IGlobalTraceDAO.class.getName());
return dao.prepareBatch(dataMap);
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(IGlobalTraceDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<GlobalTracePersistenceWorker> {
......
package org.skywalking.apm.collector.agentstream.worker.global.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class GlobalTraceEsDAO extends EsDAO implements IGlobalTraceDAO {
public class GlobalTraceEsDAO extends EsDAO implements IGlobalTraceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
private final Logger logger = LoggerFactory.getLogger(GlobalTraceEsDAO.class);
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
logger.debug("global trace prepareBatch, id: {}", id);
Map<String, Object> source = new HashMap();
source.put(GlobalTraceTable.COLUMN_SEGMENT_ID, data.getDataString(1));
source.put(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, data.getDataString(2));
source.put(GlobalTraceTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
logger.debug("global trace source: {}", source.toString());
IndexRequestBuilder builder = getClient().prepareIndex(GlobalTraceTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(GlobalTraceTable.COLUMN_SEGMENT_ID, data.getDataString(1));
source.put(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, data.getDataString(2));
source.put(GlobalTraceTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
logger.debug("global trace source: {}", source.toString());
return getClient().prepareIndex(GlobalTraceTable.TABLE, data.getDataString(0)).setSource(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.global.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class GlobalTraceH2DAO extends H2DAO implements IGlobalTraceDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.global.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface IGlobalTraceDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
......@@ -14,10 +14,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class GlobalTraceDataDefine extends DataDefine {
@Override public int defineId() {
return 403;
}
@Override protected int initialCapacity() {
return 4;
}
......
package org.skywalking.apm.collector.agentstream.worker.node.component;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.node.component.dao.INodeComponentDAO;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......@@ -10,7 +8,7 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......@@ -28,9 +26,12 @@ public class NodeComponentPersistenceWorker extends PersistenceWorker {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
INodeComponentDAO dao = (INodeComponentDAO)DAOContainer.INSTANCE.get(INodeComponentDAO.class.getName());
return dao.prepareBatch(dataMap);
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(INodeComponentDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeComponentPersistenceWorker> {
......
......@@ -3,85 +3,89 @@ package org.skywalking.apm.collector.agentstream.worker.node.component;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.cache.ComponentCache;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.trace.component.ComponentsDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener {
public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener, LocalSpanListener {
private final Logger logger = LoggerFactory.getLogger(NodeComponentSpanListener.class);
private List<NodeComponentDataDefine.NodeComponent> nodeComponents = new ArrayList<>();
private List<String> nodeComponents = new ArrayList<>();
private long timeBucket;
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String componentName = ComponentsDefine.getInstance().getComponentName(spanObject.getComponentId());
createNodeComponent(spanObject, applicationId, componentName);
String componentName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getComponentId());
if (spanObject.getComponentId() == 0) {
componentName = spanObject.getComponent();
}
String peer = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getPeerId());
if (spanObject.getPeerId() == 0) {
peer = spanObject.getPeer();
}
String agg = peer + Const.ID_SPLIT + componentName;
nodeComponents.add(agg);
}
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String componentName = ComponentsDefine.getInstance().getComponentName(spanObject.getComponentId());
createNodeComponent(spanObject, applicationId, componentName);
buildEntryOrLocal(spanObject, applicationId);
}
@Override
public void parseLocal(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
int componentId = ComponentCache.get(applicationId, spanObject.getComponent());
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setApplicationId(applicationId);
nodeComponent.setComponentId(componentId);
nodeComponent.setComponentName(spanObject.getComponent());
buildEntryOrLocal(spanObject, applicationId);
}
if (componentId == 0) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
private void buildEntryOrLocal(SpanObject spanObject, int applicationId) {
String componentName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getComponentId());
logger.debug("send to node component exchange worker, id: {}", nodeComponent.getId());
nodeComponent.setId(applicationId + Const.ID_SPLIT + spanObject.getComponent());
try {
context.getClusterWorkerContext().lookup(NodeComponentExchangeWorker.WorkerRole.INSTANCE).tell(nodeComponent.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
} else {
nodeComponent.setId(applicationId + Const.ID_SPLIT + componentId);
nodeComponents.add(nodeComponent);
if (spanObject.getComponentId() == 0) {
componentName = spanObject.getComponent();
}
String peer = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId);
String agg = peer + Const.ID_SPLIT + componentName;
nodeComponents.add(agg);
}
private void createNodeComponent(SpanObject spanObject, int applicationId, String componentName) {
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setApplicationId(applicationId);
nodeComponent.setComponentId(spanObject.getComponentId());
nodeComponent.setComponentName(componentName);
nodeComponent.setId(applicationId + Const.ID_SPLIT + spanObject.getComponentId());
nodeComponents.add(nodeComponent);
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
}
@Override public void build() {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
for (NodeComponentDataDefine.NodeComponent nodeComponent : nodeComponents) {
nodeComponents.forEach(agg -> {
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setId(timeBucket + Const.ID_SPLIT + agg);
nodeComponent.setAgg(agg);
nodeComponent.setTimeBucket(timeBucket);
try {
logger.debug("send to node component aggregation worker, id: {}", nodeComponent.getId());
context.getClusterWorkerContext().lookup(NodeComponentAggregationWorker.WorkerRole.INSTANCE).tell(nodeComponent.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
});
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface INodeComponentDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
int getComponentId(int applicationId, String componentName);
}
package org.skywalking.apm.collector.agentstream.worker.node.component.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentTable;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO {
public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
Map<String, Object> source = new HashMap();
source.put(NodeComponentTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(NodeComponentTable.COLUMN_COMPONENT_NAME, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_COMPONENT_ID, data.getDataInteger(1));
IndexRequestBuilder builder = getClient().prepareIndex(NodeComponentTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
@Override public Data get(String id, DataDefine dataDefine) {
GetResponse getResponse = getClient().prepareGet(NodeComponentTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataString(1, (String)source.get(NodeComponentTable.COLUMN_AGG));
data.setDataLong(0, (Long)source.get(NodeComponentTable.COLUMN_TIME_BUCKET));
return data;
} else {
return null;
}
}
public int getComponentId(int applicationId, String componentName) {
ElasticSearchClient client = getClient();
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeComponentTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(NodeComponentTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termQuery(NodeComponentTable.COLUMN_APPLICATION_ID, applicationId));
boolQueryBuilder.must(QueryBuilders.termQuery(NodeComponentTable.COLUMN_COMPONENT_NAME, componentName));
searchRequestBuilder.setQuery(boolQueryBuilder);
searchRequestBuilder.setSize(1);
return getClient().prepareIndex(NodeComponentTable.TABLE, data.getDataString(0)).setSource(source);
}
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
if (searchResponse.getHits().totalHits > 0) {
SearchHit searchHit = searchResponse.getHits().iterator().next();
int componentId = (int)searchHit.getSource().get(NodeComponentTable.COLUMN_COMPONENT_ID);
return componentId;
}
return 0;
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeComponentTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareUpdate(NodeComponentTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
......@@ -9,11 +7,4 @@ import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
*/
public class NodeComponentH2DAO extends H2DAO implements INodeComponentDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
@Override public int getComponentId(int applicationId, String componentName) {
return 0;
}
}
......@@ -5,7 +5,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.Exchange;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -15,20 +14,14 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class NodeComponentDataDefine extends DataDefine {
@Override public int defineId() {
return 101;
}
@Override protected int initialCapacity() {
return 5;
return 3;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(NodeComponentTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(NodeComponentTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(NodeComponentTable.COLUMN_COMPONENT_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(NodeComponentTable.COLUMN_COMPONENT_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(4, new Attribute(NodeComponentTable.COLUMN_EXCHANGE_TIMES, AttributeType.INTEGER, new NonOperation()));
addAttribute(1, new Attribute(NodeComponentTable.COLUMN_AGG, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(NodeComponentTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
......@@ -39,41 +32,33 @@ public class NodeComponentDataDefine extends DataDefine {
return null;
}
public static class NodeComponent extends Exchange implements Transform<NodeComponent> {
public static class NodeComponent implements Transform<NodeComponent> {
private String id;
private int applicationId;
private String componentName;
private int componentId;
private String agg;
private long timeBucket;
public NodeComponent(String id, int applicationId, String componentName, int componentId) {
super(0);
NodeComponent(String id, String agg, long timeBucket) {
this.id = id;
this.applicationId = applicationId;
this.componentName = componentName;
this.componentId = componentId;
this.agg = agg;
this.timeBucket = timeBucket;
}
public NodeComponent() {
super(0);
}
@Override public Data toData() {
NodeComponentDataDefine define = new NodeComponentDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataInteger(0, this.applicationId);
data.setDataString(1, this.componentName);
data.setDataInteger(1, this.componentId);
data.setDataInteger(2, this.getTimes());
data.setDataString(1, this.agg);
data.setDataLong(0, this.timeBucket);
return data;
}
@Override public NodeComponent toSelf(Data data) {
this.id = data.getDataString(0);
this.applicationId = data.getDataInteger(0);
this.componentName = data.getDataString(1);
this.componentId = data.getDataInteger(1);
this.setTimes(data.getDataInteger(2));
this.agg = data.getDataString(1);
this.timeBucket = data.getDataLong(0);
return this;
}
......@@ -81,32 +66,24 @@ public class NodeComponentDataDefine extends DataDefine {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getComponentName() {
return componentName;
}
public void setComponentName(String componentName) {
this.componentName = componentName;
public String getAgg() {
return agg;
}
public int getComponentId() {
return componentId;
public long getTimeBucket() {
return timeBucket;
}
public void setComponentId(int componentId) {
this.componentId = componentId;
public void setId(String id) {
this.id = id;
}
public int getApplicationId() {
return applicationId;
public void setAgg(String agg) {
this.agg = agg;
}
public void setApplicationId(int applicationId) {
this.applicationId = applicationId;
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
}
}
......@@ -25,8 +25,7 @@ public class NodeComponentEsTableDefine extends ElasticSearchTableDefine {
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
......@@ -14,8 +14,7 @@ public class NodeComponentH2TableDefine extends H2TableDefine {
@Override public void initialize() {
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_AGG, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
......@@ -7,7 +7,4 @@ import org.skywalking.apm.collector.agentstream.worker.CommonTable;
*/
public class NodeComponentTable extends CommonTable {
public static final String TABLE = "node_component";
public static final String COLUMN_APPLICATION_ID = "application_id";
public static final String COLUMN_COMPONENT_NAME = "component_name";
public static final String COLUMN_COMPONENT_ID = "component_id";
}
package org.skywalking.apm.collector.agentstream.worker.node.mapping;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.INodeMappingDAO;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......@@ -10,7 +8,7 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......@@ -28,9 +26,12 @@ public class NodeMappingPersistenceWorker extends PersistenceWorker {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
INodeMappingDAO dao = (INodeMappingDAO)DAOContainer.INSTANCE.get(INodeMappingDAO.class.getName());
return dao.prepareBatch(dataMap);
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(INodeMappingDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeMappingPersistenceWorker> {
......
......@@ -6,6 +6,7 @@ import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
......@@ -30,12 +31,12 @@ public class NodeMappingSpanListener implements RefsListener, FirstSpanListener
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
String segmentId) {
logger.debug("node mapping listener parse reference");
String peers = Const.PEERS_FRONT_SPLIT + reference.getNetworkAddressId() + Const.PEERS_BEHIND_SPLIT;
if (reference.getNetworkAddressId() == 0) {
peers = Const.PEERS_FRONT_SPLIT + reference.getNetworkAddress() + Const.PEERS_BEHIND_SPLIT;
String peers = reference.getNetworkAddress();
if (reference.getNetworkAddressId() != 0) {
peers = ExchangeMarkUtils.INSTANCE.buildMarkedID(reference.getNetworkAddressId());
}
String agg = applicationId + Const.ID_SPLIT + peers;
String agg = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId) + Const.ID_SPLIT + peers;
nodeMappings.add(agg);
}
......
package org.skywalking.apm.collector.agentstream.worker.node.mapping.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface INodeMappingDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.node.mapping.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class NodeMappingEsDAO extends EsDAO implements INodeMappingDAO {
public class NodeMappingEsDAO extends EsDAO implements INodeMappingDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
Map<String, Object> source = new HashMap();
source.put(NodeMappingTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeMappingTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
@Override public Data get(String id, DataDefine dataDefine) {
GetResponse getResponse = getClient().prepareGet(NodeMappingTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataString(1, (String)source.get(NodeMappingTable.COLUMN_AGG));
data.setDataLong(0, (Long)source.get(NodeMappingTable.COLUMN_TIME_BUCKET));
return data;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeMappingTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeMappingTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareIndex(NodeMappingTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeMappingTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeMappingTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
IndexRequestBuilder builder = getClient().prepareIndex(NodeMappingTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
return getClient().prepareUpdate(NodeMappingTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.node.mapping.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class NodeMappingH2DAO extends H2DAO implements INodeMappingDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
}
......@@ -14,10 +14,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class NodeMappingDataDefine extends DataDefine {
@Override public int defineId() {
return 102;
}
@Override protected int initialCapacity() {
return 3;
}
......@@ -36,12 +32,12 @@ public class NodeMappingDataDefine extends DataDefine {
return null;
}
public static class NodeMapping implements Transform {
public static class NodeMapping implements Transform<NodeMapping> {
private String id;
private String agg;
private long timeBucket;
public NodeMapping(String id, String agg, long timeBucket) {
NodeMapping(String id, String agg, long timeBucket) {
this.id = id;
this.agg = agg;
this.timeBucket = timeBucket;
......@@ -59,8 +55,11 @@ public class NodeMappingDataDefine extends DataDefine {
return data;
}
@Override public Object toSelf(Data data) {
return null;
@Override public NodeMapping toSelf(Data data) {
this.id = data.getDataString(0);
this.agg = data.getDataString(1);
this.timeBucket = data.getDataLong(0);
return this;
}
public String getId() {
......
package org.skywalking.apm.collector.agentstream.worker.noderef.reference;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.INodeReferenceDAO;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......@@ -10,7 +8,7 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......@@ -28,9 +26,12 @@ public class NodeRefPersistenceWorker extends PersistenceWorker {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
INodeReferenceDAO dao = (INodeReferenceDAO)DAOContainer.INSTANCE.get(INodeReferenceDAO.class.getName());
return dao.prepareBatch(dataMap);
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(INodeReferenceDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefPersistenceWorker> {
......
......@@ -3,11 +3,13 @@ package org.skywalking.apm.collector.agentstream.worker.noderef.reference;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.cache.InstanceCache;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
......@@ -26,28 +28,27 @@ public class NodeRefSpanListener implements EntrySpanListener, ExitSpanListener,
private final Logger logger = LoggerFactory.getLogger(NodeRefSpanListener.class);
private List<String> nodeExitReferences = new ArrayList<>();
private List<String> nodeReferences = new ArrayList<>();
private List<String> nodeEntryReferences = new ArrayList<>();
private long timeBucket;
private boolean hasReference = false;
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String front = String.valueOf(applicationId);
String behind = String.valueOf(spanObject.getPeerId());
if (spanObject.getPeerId() == 0) {
behind = spanObject.getPeer();
String front = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId);
String behind = spanObject.getPeer();
if (spanObject.getPeerId() != 0) {
behind = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getPeerId());
}
String agg = front + Const.ID_SPLIT + behind;
nodeExitReferences.add(agg);
nodeReferences.add(agg);
}
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String behind = String.valueOf(applicationId);
String front = Const.USER_CODE;
String behind = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId);
String front = ExchangeMarkUtils.INSTANCE.buildMarkedID(Const.USER_ID);
String agg = front + Const.ID_SPLIT + behind;
nodeEntryReferences.add(agg);
}
......@@ -59,6 +60,14 @@ public class NodeRefSpanListener implements EntrySpanListener, ExitSpanListener,
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
String segmentId) {
int parentApplicationId = InstanceCache.get(reference.getParentApplicationInstanceId());
String front = ExchangeMarkUtils.INSTANCE.buildMarkedID(parentApplicationId);
String behind = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId);
String agg = front + Const.ID_SPLIT + behind;
nodeReferences.add(agg);
hasReference = true;
}
......@@ -66,10 +75,10 @@ public class NodeRefSpanListener implements EntrySpanListener, ExitSpanListener,
logger.debug("node reference listener build");
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
if (!hasReference) {
nodeExitReferences.addAll(nodeEntryReferences);
nodeReferences.addAll(nodeEntryReferences);
}
for (String agg : nodeExitReferences) {
for (String agg : nodeReferences) {
NodeRefDataDefine.NodeReference nodeReference = new NodeRefDataDefine.NodeReference();
nodeReference.setId(timeBucket + Const.ID_SPLIT + agg);
nodeReference.setAgg(agg);
......
package org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface INodeReferenceDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class NodeReferenceEsDAO extends EsDAO implements INodeReferenceDAO {
public class NodeReferenceEsDAO extends EsDAO implements INodeReferenceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
Map<String, Object> source = new HashMap();
source.put(NodeRefTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeRefTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
@Override public Data get(String id, DataDefine dataDefine) {
GetResponse getResponse = getClient().prepareGet(NodeRefTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataString(1, (String)source.get(NodeRefTable.COLUMN_AGG));
data.setDataLong(0, (Long)source.get(NodeRefTable.COLUMN_TIME_BUCKET));
return data;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeRefTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeRefTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareIndex(NodeRefTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeRefTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeRefTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
IndexRequestBuilder builder = getClient().prepareIndex(NodeRefTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
return getClient().prepareUpdate(NodeRefTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class NodeReferenceH2DAO extends H2DAO implements INodeReferenceDAO {
public class NodeReferenceH2DAO extends H2DAO implements INodeReferenceDAO, IPersistenceDAO<String, String> {
@Override public List<?> prepareBatch(Map map) {
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public String prepareBatchInsert(Data data) {
return null;
}
@Override public String prepareBatchUpdate(Data data) {
return null;
}
}
......@@ -14,12 +14,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class NodeRefDataDefine extends DataDefine {
public static final int DEFINE_ID = 201;
@Override public int defineId() {
return DEFINE_ID;
}
@Override protected int initialCapacity() {
return 3;
}
......@@ -51,7 +45,7 @@ public class NodeRefDataDefine extends DataDefine {
private String agg;
private long timeBucket;
public NodeReference(String id, String agg, long timeBucket) {
NodeReference(String id, String agg, long timeBucket) {
this.id = id;
this.agg = agg;
this.timeBucket = timeBucket;
......@@ -70,7 +64,10 @@ public class NodeRefDataDefine extends DataDefine {
}
@Override public Object toSelf(Data data) {
return null;
this.id = data.getDataString(0);
this.agg = data.getDataString(1);
this.timeBucket = data.getDataLong(0);
return this;
}
public String getId() {
......
package org.skywalking.apm.collector.agentstream.worker.noderef.summary;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.INodeRefSumDAO;
import org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......@@ -10,7 +8,7 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......@@ -28,9 +26,12 @@ public class NodeRefSumPersistenceWorker extends PersistenceWorker {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
INodeRefSumDAO dao = (INodeRefSumDAO)DAOContainer.INSTANCE.get(INodeRefSumDAO.class.getName());
return dao.prepareBatch(dataMap);
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(INodeRefSumDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefSumPersistenceWorker> {
......
......@@ -3,11 +3,13 @@ package org.skywalking.apm.collector.agentstream.worker.noderef.summary;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.cache.InstanceCache;
import org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
......@@ -28,15 +30,19 @@ public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListen
private List<NodeRefSumDataDefine.NodeReferenceSum> nodeExitReferences = new ArrayList<>();
private List<NodeRefSumDataDefine.NodeReferenceSum> nodeEntryReferences = new ArrayList<>();
private List<String> nodeReferences = new ArrayList<>();
private long timeBucket;
private boolean hasReference = false;
private long startTime;
private long endTime;
private boolean isError;
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String front = String.valueOf(applicationId);
String behind = String.valueOf(spanObject.getPeerId());
if (spanObject.getPeerId() == 0) {
behind = spanObject.getPeer();
String front = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId);
String behind = spanObject.getPeer();
if (spanObject.getPeerId() != 0) {
behind = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getPeerId());
}
String agg = front + Const.ID_SPLIT + behind;
......@@ -45,9 +51,8 @@ public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListen
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String behind = String.valueOf(applicationId);
String front = Const.USER_CODE;
String behind = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId);
String front = ExchangeMarkUtils.INSTANCE.buildMarkedID(Const.USER_ID);
String agg = front + Const.ID_SPLIT + behind;
nodeEntryReferences.add(buildNodeRefSum(spanObject.getStartTime(), spanObject.getEndTime(), agg, spanObject.getIsError()));
}
......@@ -76,11 +81,22 @@ public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListen
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
startTime = spanObject.getStartTime();
endTime = spanObject.getEndTime();
isError = spanObject.getIsError();
}
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
String segmentId) {
int parentApplicationId = InstanceCache.get(reference.getParentApplicationInstanceId());
String front = ExchangeMarkUtils.INSTANCE.buildMarkedID(parentApplicationId);
String behind = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId);
String agg = front + Const.ID_SPLIT + behind;
hasReference = true;
nodeReferences.add(agg);
}
@Override public void build() {
......@@ -88,6 +104,10 @@ public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListen
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
if (!hasReference) {
nodeExitReferences.addAll(nodeEntryReferences);
} else {
nodeReferences.forEach(agg -> {
nodeExitReferences.add(buildNodeRefSum(startTime, endTime, agg, isError));
});
}
for (NodeRefSumDataDefine.NodeReferenceSum referenceSum : nodeExitReferences) {
......
package org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface INodeRefSumDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefTable;
import org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class NodeRefSumEsDAO extends EsDAO implements INodeRefSumDAO {
public class NodeRefSumEsDAO extends EsDAO implements INodeRefSumDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
Map<String, Object> source = new HashMap();
source.put(NodeRefSumTable.COLUMN_ONE_SECOND_LESS, data.getDataLong(0));
source.put(NodeRefSumTable.COLUMN_THREE_SECOND_LESS, data.getDataLong(1));
source.put(NodeRefSumTable.COLUMN_FIVE_SECOND_LESS, data.getDataLong(2));
source.put(NodeRefSumTable.COLUMN_FIVE_SECOND_GREATER, data.getDataLong(3));
source.put(NodeRefSumTable.COLUMN_ERROR, data.getDataLong(4));
source.put(NodeRefSumTable.COLUMN_SUMMARY, data.getDataLong(5));
source.put(NodeRefSumTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeRefSumTable.COLUMN_TIME_BUCKET, data.getDataLong(6));
@Override public Data get(String id, DataDefine dataDefine) {
GetResponse getResponse = getClient().prepareGet(NodeRefSumTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataLong(0, ((Number)source.get(NodeRefSumTable.COLUMN_ONE_SECOND_LESS)).longValue());
data.setDataLong(1, ((Number)source.get(NodeRefSumTable.COLUMN_THREE_SECOND_LESS)).longValue());
data.setDataLong(2, ((Number)source.get(NodeRefSumTable.COLUMN_FIVE_SECOND_LESS)).longValue());
data.setDataLong(3, ((Number)source.get(NodeRefSumTable.COLUMN_FIVE_SECOND_GREATER)).longValue());
data.setDataLong(4, ((Number)source.get(NodeRefSumTable.COLUMN_ERROR)).longValue());
data.setDataLong(5, ((Number)source.get(NodeRefSumTable.COLUMN_SUMMARY)).longValue());
data.setDataLong(6, ((Number)source.get(NodeRefSumTable.COLUMN_TIME_BUCKET)).longValue());
data.setDataString(1, (String)source.get(NodeRefSumTable.COLUMN_AGG));
return data;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeRefSumTable.COLUMN_ONE_SECOND_LESS, data.getDataLong(0));
source.put(NodeRefSumTable.COLUMN_THREE_SECOND_LESS, data.getDataLong(1));
source.put(NodeRefSumTable.COLUMN_FIVE_SECOND_LESS, data.getDataLong(2));
source.put(NodeRefSumTable.COLUMN_FIVE_SECOND_GREATER, data.getDataLong(3));
source.put(NodeRefSumTable.COLUMN_ERROR, data.getDataLong(4));
source.put(NodeRefSumTable.COLUMN_SUMMARY, data.getDataLong(5));
source.put(NodeRefSumTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeRefSumTable.COLUMN_TIME_BUCKET, data.getDataLong(6));
return getClient().prepareIndex(NodeRefSumTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeRefSumTable.COLUMN_ONE_SECOND_LESS, data.getDataLong(0));
source.put(NodeRefSumTable.COLUMN_THREE_SECOND_LESS, data.getDataLong(1));
source.put(NodeRefSumTable.COLUMN_FIVE_SECOND_LESS, data.getDataLong(2));
source.put(NodeRefSumTable.COLUMN_FIVE_SECOND_GREATER, data.getDataLong(3));
source.put(NodeRefSumTable.COLUMN_ERROR, data.getDataLong(4));
source.put(NodeRefSumTable.COLUMN_SUMMARY, data.getDataLong(5));
source.put(NodeRefSumTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeRefSumTable.COLUMN_TIME_BUCKET, data.getDataLong(6));
IndexRequestBuilder builder = getClient().prepareIndex(NodeRefSumTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
return getClient().prepareUpdate(NodeRefTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class NodeRefSumH2DAO extends H2DAO implements INodeRefSumDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
}
......@@ -14,12 +14,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class NodeRefSumDataDefine extends DataDefine {
public static final int DEFINE_ID = 202;
@Override public int defineId() {
return DEFINE_ID;
}
@Override protected int initialCapacity() {
return 9;
}
......
......@@ -12,18 +12,12 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class ApplicationDataDefine extends DataDefine {
public static final int DEFINE_ID = 101;
@Override public int defineId() {
return DEFINE_ID;
}
@Override protected int initialCapacity() {
return 3;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute("id", AttributeType.STRING, new NonOperation()));
addAttribute(0, new Attribute(ApplicationTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(ApplicationTable.COLUMN_APPLICATION_CODE, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(ApplicationTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
}
......
package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.register.IdAutoIncrement;
import org.skywalking.apm.collector.agentstream.worker.register.application.dao.IApplicationDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......@@ -40,8 +41,11 @@ public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker {
if (applicationId == 0) {
int min = dao.getMinApplicationId();
if (min == 0) {
application.setApplicationId(1);
application.setId("1");
ApplicationDataDefine.Application userApplication = new ApplicationDataDefine.Application(String.valueOf(Const.USER_ID), Const.USER_CODE, Const.USER_ID);
dao.save(userApplication);
application.setApplicationId(-1);
application.setId("-1");
} else {
int max = dao.getMaxApplicationId();
applicationId = IdAutoIncrement.INSTANCE.increment(min, max);
......
package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
/**
* @author pengys5
*/
public class ApplicationTable {
public class ApplicationTable extends CommonTable {
public static final String TABLE = "application";
public static final String COLUMN_APPLICATION_CODE = "application_code";
public static final String COLUMN_APPLICATION_ID = "application_id";
......
......@@ -12,20 +12,14 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class InstanceDataDefine extends DataDefine {
public static final int DEFINE_ID = 102;
@Override public int defineId() {
return DEFINE_ID;
}
@Override protected int initialCapacity() {
return 6;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute("id", AttributeType.STRING, new NonOperation()));
addAttribute(0, new Attribute(InstanceTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(InstanceTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(InstanceTable.COLUMN_AGENTUUID, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(InstanceTable.COLUMN_AGENT_UUID, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(InstanceTable.COLUMN_REGISTER_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(4, new Attribute(InstanceTable.COLUMN_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(5, new Attribute(InstanceTable.COLUMN_HEARTBEAT_TIME, AttributeType.LONG, new CoverOperation()));
......
......@@ -26,7 +26,7 @@ public class InstanceEsTableDefine extends ElasticSearchTableDefine {
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_AGENTUUID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_AGENT_UUID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_REGISTER_TIME, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_HEARTBEAT_TIME, ElasticSearchColumnDefine.Type.Long.name()));
......
......@@ -14,7 +14,7 @@ public class InstanceH2TableDefine extends H2TableDefine {
@Override public void initialize() {
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_AGENTUUID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_AGENT_UUID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_REGISTER_TIME, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_INSTANCE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_HEARTBEAT_TIME, H2ColumnDefine.Type.Bigint.name()));
......
package org.skywalking.apm.collector.agentstream.worker.register.instance;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
/**
* @author pengys5
*/
public class InstanceTable {
public class InstanceTable extends CommonTable {
public static final String TABLE = "instance";
public static final String COLUMN_APPLICATION_ID = "application_id";
public static final String COLUMN_AGENTUUID = "agent_uuid";
public static final String COLUMN_AGENT_UUID = "agent_uuid";
public static final String COLUMN_REGISTER_TIME = "register_time";
public static final String COLUMN_INSTANCE_ID = "instance_id";
public static final String COLUMN_HEARTBEAT_TIME = "heartbeatTime";
......
......@@ -15,4 +15,6 @@ public interface IInstanceDAO {
void save(InstanceDataDefine.Instance instance);
void updateHeartbeatTime(int instanceId, long heartbeatTime);
int getApplicationId(int applicationInstanceId);
}
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.register.instance.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
......@@ -33,15 +34,14 @@ public class InstanceEsDAO extends EsDAO implements IInstanceDAO {
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
BoolQueryBuilder builder = QueryBuilders.boolQuery();
builder.must().add(QueryBuilders.termQuery(InstanceTable.COLUMN_APPLICATION_ID, applicationId));
builder.must().add(QueryBuilders.termQuery(InstanceTable.COLUMN_AGENTUUID, agentUUID));
builder.must().add(QueryBuilders.termQuery(InstanceTable.COLUMN_AGENT_UUID, agentUUID));
searchRequestBuilder.setQuery(builder);
searchRequestBuilder.setSize(1);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
if (searchResponse.getHits().totalHits > 0) {
SearchHit searchHit = searchResponse.getHits().iterator().next();
int instanceId = (int)searchHit.getSource().get(InstanceTable.COLUMN_INSTANCE_ID);
return instanceId;
return (int)searchHit.getSource().get(InstanceTable.COLUMN_INSTANCE_ID);
}
return 0;
}
......@@ -57,10 +57,10 @@ public class InstanceEsDAO extends EsDAO implements IInstanceDAO {
@Override public void save(InstanceDataDefine.Instance instance) {
logger.debug("save instance register info, application id: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID());
ElasticSearchClient client = getClient();
Map<String, Object> source = new HashMap();
Map<String, Object> source = new HashMap<>();
source.put(InstanceTable.COLUMN_INSTANCE_ID, instance.getInstanceId());
source.put(InstanceTable.COLUMN_APPLICATION_ID, instance.getApplicationId());
source.put(InstanceTable.COLUMN_AGENTUUID, instance.getAgentUUID());
source.put(InstanceTable.COLUMN_AGENT_UUID, instance.getAgentUUID());
source.put(InstanceTable.COLUMN_REGISTER_TIME, instance.getRegisterTime());
IndexResponse response = client.prepareIndex(InstanceTable.TABLE, instance.getId()).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
......@@ -75,10 +75,19 @@ public class InstanceEsDAO extends EsDAO implements IInstanceDAO {
updateRequest.id(String.valueOf(instanceId));
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
Map<String, Object> source = new HashMap();
Map<String, Object> source = new HashMap<>();
source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, heartbeatTime);
updateRequest.doc(source);
client.update(updateRequest);
}
@Override public int getApplicationId(int applicationInstanceId) {
GetResponse response = getClient().prepareGet(InstanceTable.TABLE, String.valueOf(applicationInstanceId)).get();
if (response.isExists()) {
return (int)response.getSource().get(InstanceTable.COLUMN_APPLICATION_ID);
} else {
return 0;
}
}
}
......@@ -26,4 +26,8 @@ public class InstanceH2DAO extends H2DAO implements IInstanceDAO {
@Override public void updateHeartbeatTime(int instanceId, long heartbeatTime) {
}
@Override public int getApplicationId(int applicationInstanceId) {
return 0;
}
}
......@@ -12,18 +12,12 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class ServiceNameDataDefine extends DataDefine {
public static final int DEFINE_ID = 103;
@Override public int defineId() {
return DEFINE_ID;
}
@Override protected int initialCapacity() {
return 4;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute("id", AttributeType.STRING, new NonOperation()));
addAttribute(0, new Attribute(ServiceNameTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(ServiceNameTable.COLUMN_SERVICE_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(ServiceNameTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(3, new Attribute(ServiceNameTable.COLUMN_SERVICE_ID, AttributeType.INTEGER, new CoverOperation()));
......
package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
/**
* @author pengys5
*/
public class ServiceNameTable {
public class ServiceNameTable extends CommonTable {
public static final String TABLE = "service_name";
public static final String COLUMN_SERVICE_NAME = "service_name";
public static final String COLUMN_APPLICATION_ID = "application_id";
......
......@@ -10,6 +10,8 @@ import org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSu
import org.skywalking.apm.collector.agentstream.worker.segment.cost.SegmentCostSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.SegmentPersistenceWorker;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.define.SegmentDataDefine;
import org.skywalking.apm.collector.agentstream.worker.service.entry.ServiceEntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.ServiceRefSpanListener;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.stream.StreamModuleContext;
......@@ -38,10 +40,11 @@ public class SegmentParse {
spanListeners.add(new NodeRefSpanListener());
spanListeners.add(new NodeComponentSpanListener());
spanListeners.add(new NodeMappingSpanListener());
spanListeners.add(new NodeRefSpanListener());
spanListeners.add(new NodeRefSumSpanListener());
spanListeners.add(new SegmentCostSpanListener());
spanListeners.add(new GlobalTraceSpanListener());
spanListeners.add(new ServiceEntrySpanListener());
spanListeners.add(new ServiceRefSpanListener());
}
public void parse(List<UniqueId> traceIds, TraceSegmentObject segmentObject) {
......
package org.skywalking.apm.collector.agentstream.worker.segment.cost;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.ISegmentCostDAO;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......@@ -10,7 +8,7 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......@@ -28,9 +26,12 @@ public class SegmentCostPersistenceWorker extends PersistenceWorker {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
ISegmentCostDAO dao = (ISegmentCostDAO)DAOContainer.INSTANCE.get(ISegmentCostDAO.class.getName());
return dao.prepareBatch(dataMap);
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(ISegmentCostDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<SegmentCostPersistenceWorker> {
......
......@@ -7,6 +7,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
......@@ -40,10 +41,14 @@ public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListe
segmentCost.setCost(spanObject.getEndTime() - spanObject.getStartTime());
segmentCost.setStartTime(spanObject.getStartTime());
segmentCost.setEndTime(spanObject.getEndTime());
segmentCost.setOperationName(spanObject.getOperationName());
segmentCost.setId(segmentId);
segmentCosts.add(segmentCost);
if (spanObject.getOperationNameId() == 0) {
segmentCost.setServiceName(spanObject.getOperationName());
} else {
segmentCost.setServiceName(ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getOperationNameId()));
}
segmentCosts.add(segmentCost);
isError = isError || spanObject.getIsError();
}
......
package org.skywalking.apm.collector.agentstream.worker.segment.cost.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface ISegmentCostDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.segment.cost.dao;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SegmentCostEsDAO extends EsDAO implements ISegmentCostDAO {
public class SegmentCostEsDAO extends EsDAO implements ISegmentCostDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
private final Logger logger = LoggerFactory.getLogger(SegmentCostEsDAO.class);
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
logger.debug("segment cost prepareBatch, id: {}", id);
Map<String, Object> source = new HashMap();
source.put(SegmentCostTable.COLUMN_SEGMENT_ID, data.getDataString(1));
source.put(SegmentCostTable.COLUMN_OPERATION_NAME, data.getDataString(2));
source.put(SegmentCostTable.COLUMN_COST, data.getDataLong(0));
source.put(SegmentCostTable.COLUMN_START_TIME, data.getDataLong(1));
source.put(SegmentCostTable.COLUMN_END_TIME, data.getDataLong(2));
source.put(SegmentCostTable.COLUMN_IS_ERROR, data.getDataBoolean(0));
source.put(SegmentCostTable.COLUMN_TIME_BUCKET, data.getDataLong(3));
logger.debug("segment cost source: {}", source.toString());
IndexRequestBuilder builder = getClient().prepareIndex(SegmentCostTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
logger.debug("segment cost prepareBatchInsert, id: {}", data.getDataString(0));
Map<String, Object> source = new HashMap<>();
source.put(SegmentCostTable.COLUMN_SEGMENT_ID, data.getDataString(1));
source.put(SegmentCostTable.COLUMN_SERVICE_NAME, data.getDataString(2));
source.put(SegmentCostTable.COLUMN_COST, data.getDataLong(0));
source.put(SegmentCostTable.COLUMN_START_TIME, data.getDataLong(1));
source.put(SegmentCostTable.COLUMN_END_TIME, data.getDataLong(2));
source.put(SegmentCostTable.COLUMN_IS_ERROR, data.getDataBoolean(0));
source.put(SegmentCostTable.COLUMN_TIME_BUCKET, data.getDataLong(3));
logger.debug("segment cost source: {}", source.toString());
return getClient().prepareIndex(SegmentCostTable.TABLE, data.getDataString(0)).setSource(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.segment.cost.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
......@@ -9,7 +7,4 @@ import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
*/
public class SegmentCostH2DAO extends H2DAO implements ISegmentCostDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
}
......@@ -14,10 +14,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class SegmentCostDataDefine extends DataDefine {
@Override public int defineId() {
return 402;
}
@Override protected int initialCapacity() {
return 8;
}
......@@ -25,7 +21,7 @@ public class SegmentCostDataDefine extends DataDefine {
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(SegmentCostTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(SegmentCostTable.COLUMN_SEGMENT_ID, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(SegmentCostTable.COLUMN_OPERATION_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(SegmentCostTable.COLUMN_SERVICE_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(SegmentCostTable.COLUMN_COST, AttributeType.LONG, new CoverOperation()));
addAttribute(4, new Attribute(SegmentCostTable.COLUMN_START_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(5, new Attribute(SegmentCostTable.COLUMN_END_TIME, AttributeType.LONG, new CoverOperation()));
......@@ -36,13 +32,13 @@ public class SegmentCostDataDefine extends DataDefine {
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
String segmentId = remoteData.getDataStrings(1);
String operationName = remoteData.getDataStrings(2);
String serviceName = remoteData.getDataStrings(2);
Long cost = remoteData.getDataLongs(0);
Long startTime = remoteData.getDataLongs(1);
Long endTime = remoteData.getDataLongs(2);
Boolean isError = remoteData.getDataBooleans(0);
Long timeBucket = remoteData.getDataLongs(2);
return new SegmentCost(id, segmentId, operationName, cost, startTime, endTime, isError, timeBucket);
return new SegmentCost(id, segmentId, serviceName, cost, startTime, endTime, isError, timeBucket);
}
@Override public RemoteData serialize(Object object) {
......@@ -50,7 +46,7 @@ public class SegmentCostDataDefine extends DataDefine {
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(segmentCost.getId());
builder.addDataStrings(segmentCost.getSegmentId());
builder.addDataStrings(segmentCost.getOperationName());
builder.addDataStrings(segmentCost.getServiceName());
builder.addDataLongs(segmentCost.getCost());
builder.addDataLongs(segmentCost.getStartTime());
builder.addDataLongs(segmentCost.getEndTime());
......@@ -62,18 +58,18 @@ public class SegmentCostDataDefine extends DataDefine {
public static class SegmentCost implements Transform {
private String id;
private String segmentId;
private String operationName;
private String serviceName;
private Long cost;
private Long startTime;
private Long endTime;
private boolean isError;
private long timeBucket;
SegmentCost(String id, String segmentId, String operationName, Long cost,
SegmentCost(String id, String segmentId, String serviceName, Long cost,
Long startTime, Long endTime, boolean isError, long timeBucket) {
this.id = id;
this.segmentId = segmentId;
this.operationName = operationName;
this.serviceName = serviceName;
this.cost = cost;
this.startTime = startTime;
this.endTime = endTime;
......@@ -89,7 +85,7 @@ public class SegmentCostDataDefine extends DataDefine {
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataString(1, this.segmentId);
data.setDataString(2, this.operationName);
data.setDataString(2, this.serviceName);
data.setDataLong(0, this.cost);
data.setDataLong(1, this.startTime);
data.setDataLong(2, this.endTime);
......@@ -118,12 +114,12 @@ public class SegmentCostDataDefine extends DataDefine {
this.segmentId = segmentId;
}
public String getOperationName() {
return operationName;
public String getServiceName() {
return serviceName;
}
public void setOperationName(String operationName) {
this.operationName = operationName;
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
public Long getCost() {
......
......@@ -26,7 +26,7 @@ public class SegmentCostEsTableDefine extends ElasticSearchTableDefine {
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_SEGMENT_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_OPERATION_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_SERVICE_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_COST, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_START_TIME, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_END_TIME, ElasticSearchColumnDefine.Type.Long.name()));
......
......@@ -15,7 +15,7 @@ public class SegmentCostH2TableDefine extends H2TableDefine {
@Override public void initialize() {
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_SEGMENT_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_OPERATION_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_SERVICE_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_COST, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_START_TIME, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_END_TIME, H2ColumnDefine.Type.Bigint.name()));
......
......@@ -10,7 +10,7 @@ public class SegmentCostTable extends CommonTable {
public static final String COLUMN_SEGMENT_ID = "segment_id";
public static final String COLUMN_START_TIME = "start_time";
public static final String COLUMN_END_TIME = "end_time";
public static final String COLUMN_OPERATION_NAME = "operation_name";
public static final String COLUMN_SERVICE_NAME = "service_name";
public static final String COLUMN_COST = "cost";
public static final String COLUMN_IS_ERROR = "is_error";
}
package org.skywalking.apm.collector.agentstream.worker.segment.origin;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.dao.ISegmentDAO;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.define.SegmentDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......@@ -10,7 +8,7 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......@@ -28,9 +26,12 @@ public class SegmentPersistenceWorker extends PersistenceWorker {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
ISegmentDAO dao = (ISegmentDAO)DAOContainer.INSTANCE.get(ISegmentDAO.class.getName());
return dao.prepareBatch(dataMap);
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(ISegmentDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<SegmentPersistenceWorker> {
......
package org.skywalking.apm.collector.agentstream.worker.segment.origin.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface ISegmentDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.segment.origin.dao;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.define.SegmentTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SegmentEsDAO extends EsDAO implements ISegmentDAO {
public class SegmentEsDAO extends EsDAO implements ISegmentDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
private final Logger logger = LoggerFactory.getLogger(SegmentEsDAO.class);
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
logger.debug("segment prepareBatch, id: {}", id);
Map<String, Object> source = new HashMap();
source.put(SegmentTable.COLUMN_DATA_BINARY, new String(Base64.getEncoder().encode(data.getDataBytes(0))));
logger.debug("segment source: {}", source.toString());
IndexRequestBuilder builder = getClient().prepareIndex(SegmentTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(SegmentTable.COLUMN_DATA_BINARY, new String(Base64.getEncoder().encode(data.getDataBytes(0))));
logger.debug("segment source: {}", source.toString());
return getClient().prepareIndex(SegmentTable.TABLE, data.getDataString(0)).setSource(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.segment.origin.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class SegmentH2DAO extends H2DAO implements ISegmentDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
}
......@@ -15,12 +15,6 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class SegmentDataDefine extends DataDefine {
public static final int DEFINE_ID = 401;
@Override public int defineId() {
return DEFINE_ID;
}
@Override protected int initialCapacity() {
return 2;
}
......
package org.skywalking.apm.collector.agentstream.worker.service.entry;
import org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.WorkerRefs;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class ServiceEntryAggregationWorker extends AggregationWorker {
public ServiceEntryAggregationWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected WorkerRefs nextWorkRef(String id) throws WorkerNotFoundException {
return getClusterContext().lookup(ServiceEntryRemoteWorker.WorkerRole.INSTANCE);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceEntryAggregationWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceEntryAggregationWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceEntryAggregationWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServiceEntryAggregationWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new ServiceEntryDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.service.entry;
import org.skywalking.apm.collector.agentstream.worker.service.entry.dao.IServiceEntryDAO;
import org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class ServiceEntryPersistenceWorker extends PersistenceWorker {
public ServiceEntryPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(IServiceEntryDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceEntryPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceEntryPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceEntryPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServiceEntryPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new ServiceEntryDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.service.entry;
import org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class ServiceEntryRemoteWorker extends AbstractRemoteWorker {
protected ServiceEntryRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected void onWork(Object message) throws WorkerException {
getClusterContext().lookup(ServiceEntryPersistenceWorker.WorkerRole.INSTANCE).tell(message);
}
public static class Factory extends AbstractRemoteWorkerProvider<ServiceEntryRemoteWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceEntryRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceEntryRemoteWorker(role(), clusterContext);
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServiceEntryRemoteWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new ServiceEntryDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.service.entry;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener, EntrySpanListener {
private final Logger logger = LoggerFactory.getLogger(ServiceEntrySpanListener.class);
private long timeBucket;
private boolean hasReference = false;
private String agg;
private int applicationId;
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String entryServiceName = spanObject.getOperationName();
if (spanObject.getOperationNameId() != 0) {
entryServiceName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getOperationNameId());
}
this.agg = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId) + Const.ID_SPLIT + entryServiceName;
this.applicationId = applicationId;
}
@Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
String segmentId) {
hasReference = true;
}
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
}
@Override public void build() {
logger.debug("entry service listener build");
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
if (!hasReference) {
ServiceEntryDataDefine.ServiceEntry serviceEntry = new ServiceEntryDataDefine.ServiceEntry();
serviceEntry.setId(timeBucket + Const.ID_SPLIT + agg);
serviceEntry.setApplicationId(applicationId);
serviceEntry.setAgg(agg);
serviceEntry.setTimeBucket(timeBucket);
try {
logger.debug("send to service entry aggregation worker, id: {}", serviceEntry.getId());
context.getClusterWorkerContext().lookup(ServiceEntryAggregationWorker.WorkerRole.INSTANCE).tell(serviceEntry.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
}
}
package org.skywalking.apm.collector.agentstream.worker.service.entry.dao;
/**
* @author pengys5
*/
public interface IServiceEntryDAO {
}
package org.skywalking.apm.collector.agentstream.worker.service.entry.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class ServiceEntryEsDAO extends EsDAO implements IServiceEntryDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public Data get(String id, DataDefine dataDefine) {
GetResponse getResponse = getClient().prepareGet(ServiceEntryTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataInteger(0, (Integer)source.get(ServiceEntryTable.COLUMN_APPLICATION_ID));
data.setDataString(1, (String)source.get(ServiceEntryTable.COLUMN_AGG));
data.setDataLong(0, (Long)source.get(ServiceEntryTable.COLUMN_TIME_BUCKET));
return data;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceEntryTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(ServiceEntryTable.COLUMN_AGG, data.getDataString(1));
source.put(ServiceEntryTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareIndex(ServiceEntryTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceEntryTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(ServiceEntryTable.COLUMN_AGG, data.getDataString(1));
source.put(ServiceEntryTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareUpdate(ServiceEntryTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.service.entry.dao;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class ServiceEntryH2DAO extends H2DAO implements IServiceEntryDAO {
}
package org.skywalking.apm.collector.agentstream.worker.service.entry.define;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class ServiceEntryDataDefine extends DataDefine {
@Override protected int initialCapacity() {
return 4;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(ServiceEntryTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(ServiceEntryTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new NonOperation()));
addAttribute(2, new Attribute(ServiceEntryTable.COLUMN_AGG, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(ServiceEntryTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
return null;
}
@Override public RemoteData serialize(Object object) {
return null;
}
public static class ServiceEntry implements Transform<ServiceEntry> {
private String id;
private int applicationId;
private String agg;
private long timeBucket;
ServiceEntry(String id, int applicationId, String agg, long timeBucket) {
this.id = id;
this.applicationId = applicationId;
this.agg = agg;
this.timeBucket = timeBucket;
}
public ServiceEntry() {
}
@Override public Data toData() {
ServiceEntryDataDefine define = new ServiceEntryDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataInteger(0, this.applicationId);
data.setDataString(1, this.agg);
data.setDataLong(0, this.timeBucket);
return data;
}
@Override public ServiceEntry toSelf(Data data) {
this.id = data.getDataString(0);
this.applicationId = data.getDataInteger(0);
this.agg = data.getDataString(1);
this.timeBucket = data.getDataLong(0);
return this;
}
public String getId() {
return id;
}
public String getAgg() {
return agg;
}
public long getTimeBucket() {
return timeBucket;
}
public void setId(String id) {
this.id = id;
}
public void setAgg(String agg) {
this.agg = agg;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
public int getApplicationId() {
return applicationId;
}
public void setApplicationId(int applicationId) {
this.applicationId = applicationId;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.service.entry.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class ServiceEntryEsTableDefine extends ElasticSearchTableDefine {
public ServiceEntryEsTableDefine() {
super(ServiceEntryTable.TABLE);
}
@Override public int refreshInterval() {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ServiceEntryTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ServiceEntryTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.service.entry.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class ServiceEntryH2TableDefine extends H2TableDefine {
public ServiceEntryH2TableDefine() {
super(ServiceEntryTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(ServiceEntryTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(ServiceEntryTable.COLUMN_AGG, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(ServiceEntryTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.service.entry.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
/**
* @author pengys5
*/
public class ServiceEntryTable extends CommonTable {
public static final String TABLE = "service_entry";
public static final String COLUMN_APPLICATION_ID = "application_id";
}
package org.skywalking.apm.collector.agentstream.worker.node.component;
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference;
import org.skywalking.apm.collector.agentstream.worker.cache.ComponentCache;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.impl.ExchangeWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.WorkerRefs;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class NodeComponentExchangeWorker extends ExchangeWorker {
public class ServiceRefAggregationWorker extends AggregationWorker {
private final Logger logger = LoggerFactory.getLogger(NodeComponentExchangeWorker.class);
public NodeComponentExchangeWorker(Role role, ClusterWorkerContext clusterContext) {
public ServiceRefAggregationWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
......@@ -31,30 +25,19 @@ public class NodeComponentExchangeWorker extends ExchangeWorker {
super.preStart();
}
@Override protected void exchange(Data data) {
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.toSelf(data);
int componentId = ComponentCache.get(nodeComponent.getApplicationId(), nodeComponent.getComponentName());
if (componentId == 0 && nodeComponent.getTimes() < 10) {
try {
nodeComponent.increase();
getClusterContext().lookup(NodeComponentExchangeWorker.WorkerRole.INSTANCE).tell(nodeComponent.toData());
} catch (WorkerNotFoundException | WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
}
@Override protected WorkerRefs nextWorkRef(String id) throws WorkerNotFoundException {
return getClusterContext().lookup(ServiceRefRemoteWorker.WorkerRole.INSTANCE);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeComponentExchangeWorker> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceRefAggregationWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeComponentExchangeWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeComponentExchangeWorker(role(), clusterContext);
public ServiceRefAggregationWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceRefAggregationWorker(role(), clusterContext);
}
@Override
......@@ -68,7 +51,7 @@ public class NodeComponentExchangeWorker extends ExchangeWorker {
@Override
public String roleName() {
return NodeComponentExchangeWorker.class.getSimpleName();
return ServiceRefAggregationWorker.class.getSimpleName();
}
@Override
......@@ -77,7 +60,7 @@ public class NodeComponentExchangeWorker extends ExchangeWorker {
}
@Override public DataDefine dataDefine() {
return new NodeComponentDataDefine();
return new ServiceRefDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.IServiceRefDAO;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class ServiceRefPersistenceWorker extends PersistenceWorker {
public ServiceRefPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(IServiceRefDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceRefPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceRefPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceRefPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServiceRefPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new ServiceRefDataDefine();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册