提交 75e2b7ba 编写于 作者: X Xin,Zhang 提交者: wu-sheng

support segment send service and app register service (#6)

上级 4fe9ef9f
......@@ -20,6 +20,7 @@
const Instrumentation = require("./instrumentation");
const ContextManager = require("./trace/context-manager");
const PluginManager = require("./plugins/plugin-manager");
const ServiceManager = require("./services");
const AgentConfig = require("./config");
module.exports = Agent;
......@@ -29,13 +30,17 @@ module.exports = Agent;
function Agent() {
this._instrumentation = null;
this._contextManager = null;
this._config = null;
this._serviceManager = null;
}
Agent.prototype.start = function(agentOptions) {
AgentConfig.initConfig(agentOptions);
this._contextManager = new ContextManager(this._agentConfig);
this._instrumentation = new Instrumentation();
this._serviceManager = new ServiceManager();
this._serviceManager.launch();
let _pluginManager = new PluginManager();
let _agent = this;
require("./require-module-hook")(_pluginManager.allEnhanceModules(),
......
/*
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const logger = require("../logger");
let TraceSegmentCachePool = function() {
let _bucket = [];
let _bucketSize = Number.isSafeInteger(256);
let _timeout = undefined;
let _consumer = undefined;
let _flushInterval = 1000;
this.put = function(traceSegment) {
logger.debug("trace-segment-cache-pool", "put new TraceSegment");
_bucket.push(traceSegment);
if (_bucketSize !== -1 && _bucket.length >= _bucketSize) {
this.consumeData();
} else if (!this._timeout) {
this.scheduleConsumeData();
}
};
this.consumeData = function() {
logger.debug("trace-segment-cache-pool", "consumer {} TraceSegments.", _bucket.length);
if (_bucket.length != 0) {
_consumer(_bucket);
}
this._clear();
};
this.scheduleConsumeData = function() {
let self = this;
_timeout = setTimeout(function() {
self.consumeData();
}, _flushInterval);
_timeout.unref();
};
this._clear = function() {
clearTimeout(_timeout);
_bucket = [];
_timeout = undefined;
};
this.registerConsumer = function(consumer) {
_consumer = consumer;
};
};
TraceSegmentCachePool.instance = null;
TraceSegmentCachePool.getInstance = function() {
if (this.instance === null) {
this.instance = new TraceSegmentCachePool();
}
return this.instance;
};
module.exports = TraceSegmentCachePool.getInstance();
......@@ -16,8 +16,9 @@
*/
const AgentConfig = function() {
let _applicationCode = undefined;
let _applicationId = 1;
let _applicationInstanceId = 1;
let _applicationId = undefined;
let _applicationInstanceId = undefined;
let _directServices = undefined;
this.getApplicationId = function() {
return _applicationId;
......@@ -38,11 +39,25 @@ const AgentConfig = function() {
return _applicationCode;
};
this.getDirectServices = function() {
return _directServices;
};
this.setDirectServices = function(directServices) {
_directServices = directServices;
};
this.initConfig = function(agentOptions) {
if (!agentOptions.hasOwnProperty("applicationCode")) {
throw new Error("application Code cannot be empty");
}
_applicationCode = agentOptions.applicationCode;
_directServices = "localhost:11800";
// TODO for now, only support one address
if (agentOptions.hasOwnProperty("directServers")) {
_directServices = agentOptions.directServers;
}
};
if (AgentConfig.caller != AgentConfig.getInstance) {
......
......@@ -19,8 +19,7 @@
module.exports = Instrumentation;
const debug = require("debug")("skywalking-instrumentation");
const error = debug("skywalking-instrumentation:error");
const logger = require("../logger");
/**
* @constructor
......@@ -88,7 +87,7 @@ Instrumentation.prototype.enhanceCallback = function(
if (typeof originCallBack !== "function") return originCallBack;
if (!traceContext) {
error(
logger.warn("skywalking-instrumentation",
"The Callback method won't be enhance because of TraceContext is undefined.");
return originCallBack;
}
......
/*
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const Logger = function() {
let _logger = require("debug");
this.debug = function(className, format, ...args) {
_logger(className + ":debug")(format, ...args);
};
this.info = function(className, format, ...args) {
_logger(className + ":info")(format, ...args);
};
this.warn = function(className, format, ...args) {
_logger(className + ":warn")(format, ...args);
};
this.error = function(className, format, ...args) {
_logger(className + ":error")(format, ...args);
};
};
Logger.instance = null;
Logger.getInstance = function() {
if (this.instance === null) {
this.instance = new Logger();
}
return this.instance;
};
module.exports = Logger.getInstance();
......@@ -18,7 +18,7 @@
"use strict";
module.exports = PluginManager;
const debug = require("debug")("skywalking-plugin-manager");
const logger = require("../logger");
const OFFICER_SUPPORTED_MODULE = ["mysql", "http"];
/**
......@@ -36,6 +36,6 @@ PluginManager.prototype.findPlugin = function(name, version) {
};
PluginManager.prototype.allEnhanceModules = function() {
debug("loaded enhance modules: %s", this._enhanceModule);
logger.info("skywalking-plugin-manager", "loaded enhance modules: %s", this._enhanceModule);
return this._enhanceModule;
};
......@@ -18,6 +18,8 @@
"use strict";
const endOfStream = require("end-of-stream");
const ContextCarrier = require("../../trace/context-carrier");
const layerDefine = require("../../trace/span-layer");
const componentDefine = require("../../trace/component-define");
/**
*
......@@ -51,6 +53,8 @@ module.exports = function(httpModule, instrumentation, contextManager) {
});
let span = contextManager.createEntrySpan(req.url, contextCarrier);
span.component(componentDefine.Components.HTTP);
span.spanLayer(layerDefine.Layers.HTTP);
endOfStream(res, function(err) {
if (err) {
span.errorOccurred();
......@@ -80,6 +84,8 @@ module.exports = function(httpModule, instrumentation, contextManager) {
contextCarrier.pushBy(function(key, value) {
options.headers[key] = value;
});
span.component(componentDefine.Components.HTTP);
span.spanLayer(layerDefine.Layers.HTTP);
let result = original.apply(this, arguments);
contextManager.finishSpan(span);
return result;
......
......@@ -17,6 +17,9 @@
"use strict";
const spanLayer = require("../../trace/span-layer");
const componentDefine = require("../../trace/component-define");
/**
* @param {originModule} originModule
* @param {instrumentation} instrumentation
......@@ -63,7 +66,8 @@ function enhanceQueryMethod(obj, instrumentation, contextManager) {
return function(sql, values, cb) {
let span = contextManager.createExitSpan(sql, connection.config.host +
":" + connection.config.port);
span.component(componentDefine.Components.MYSQL);
span.spanLayer(spanLayer.Layers.DB);
let enhancedValues = values;
let enhanceCallback = cb;
......
......@@ -20,8 +20,7 @@
const Module = require("module");
const path = require("path");
const originLoad = Module._load;
const debug = require("debug")("skywalking-hook");
const error = debug("skywalking-hook:error");
const logger = require("./logger");
/**
*
......@@ -36,7 +35,7 @@ module.exports = function hook(modules, requireCallback) {
let exports = originLoad.apply(Module, arguments);
if (modules.indexOf(request) == -1) {
debug("Module[%s] not in the enhance list. skip it.", request);
logger.info("skywalking-hook", "Module[%s] not in the enhance list. skip it.", request);
return exports;
}
......@@ -56,7 +55,7 @@ module.exports = function hook(modules, requireCallback) {
return hook.enhancedModuleCache[request];
} catch (e) {
error("Failed to enhance module[%s]. error message: %s", request,
logger.error("skywalking-hook", "Failed to enhance module[%s]. error message: %s", request,
e.description);
return exports;
}
......
/*
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
module.exports = AppAndInstanceDiscoveryService;
const os = require("os");
const grpc = require("grpc");
const uuid = require("uuid/v4");
const process = require("process");
const logger = require("../logger");
const AgentConfig = require("../config");
const DiscoveryService = require("../network/DiscoveryService_grpc_pb");
const DiscoveryServiceParameters = require("../network/DiscoveryService_pb");
const ApplicationRegisterService = require("../network/ApplicationRegisterService_grpc_pb");
const ApplicationRegisterServiceParameters = require("../network/ApplicationRegisterService_pb");
/**
* @param {directServers} directServers
* @constructor
* @author zhang xin
*/
function AppAndInstanceDiscoveryService(directServers) {
this._directServers = directServers;
this._processUUID = uuid();
this._applicationRegisterServiceStub = new ApplicationRegisterService.ApplicationRegisterServiceClient(
this._directServers,
grpc.credentials.createInsecure());
this._instanceDiscoveryServiceStub = new DiscoveryService.InstanceDiscoveryServiceClient(this._directServers,
grpc.credentials.createInsecure());
}
AppAndInstanceDiscoveryService.prototype.registryApplication = function(callback) {
if (AgentConfig.getApplicationId() && AgentConfig.getApplicationInstanceId()) {
return;
}
let applicationParameter = new ApplicationRegisterServiceParameters.Application();
applicationParameter.setApplicationcode(AgentConfig.getApplicationCode());
let that = this;
this._applicationRegisterServiceStub.applicationCodeRegister(applicationParameter, function(err, response) {
if (err) {
logger.error("application-register-service", "failed to register application code. error message: %s",
err.message);
return;
}
if (response && response.hasApplication()) {
that.instanceDiscovery(response.getApplication().getValue(), callback);
}
});
};
AppAndInstanceDiscoveryService.prototype.instanceDiscovery = function(applicationId, callback) {
let instanceParameter = new DiscoveryServiceParameters.ApplicationInstance();
instanceParameter.setApplicationid(applicationId);
instanceParameter.setOsinfo(buildOsInfo());
instanceParameter.setRegistertime(new Date().getMilliseconds());
instanceParameter.setAgentuuid(this._processUUID);
this._instanceDiscoveryServiceStub.registerInstance(instanceParameter, function(err, response) {
if (err) {
logger.error("instance-discovery-service", "failed to register application[%s] instance. error message: %s",
AgentConfig.getApplicationCode(), err.message);
return;
}
if (response && response.getApplicationinstanceid() != 0) {
logger.info("application-service-register", "application[%s] has been registered.",
AgentConfig.getApplicationCode());
callback(applicationId, response.getApplicationinstanceid());
}
});
/**
* @return {DiscoveryServiceParameters.OSInfo}
*/
function buildOsInfo() {
let osInfoParameter = new DiscoveryServiceParameters.OSInfo();
osInfoParameter.setOsname(os.platform());
osInfoParameter.setHostname(os.hostname());
osInfoParameter.setProcessno(process.pid);
osInfoParameter.setIpv4sList(getAllIPv4Address());
return osInfoParameter;
}
/**
* @return {Array}
*/
function getAllIPv4Address() {
let ipv4Address = [];
let ifaces = os.networkInterfaces();
Object.keys(ifaces).forEach(function(ifname) {
ifaces[ifname].forEach(function(iface) {
if ("IPv4" !== iface.family || iface.internal !== false) {
return;
}
ipv4Address.push(iface.address);
});
});
return ipv4Address;
}
};
/*
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
module.exports = ServiceManager;
const AgentConfig = require("../config");
const traceSegmentCache = require("../cache");
const ApplicationRegisterService = require("./application-register-service");
const TraceSegmentSendService = require("./trace-segment-send-service");
/**
*
* @constructor
* @author zhang xin
*/
function ServiceManager() {
let directServers = AgentConfig.getDirectServices();
this._applicationRegisterService = new ApplicationRegisterService(directServers);
this._traceSegmentSendService = new TraceSegmentSendService(directServers);
}
ServiceManager.prototype.launch = function() {
let applicationRegisterService = this._applicationRegisterService;
let timer = setInterval(function() {
applicationRegisterService.registryApplication.apply(applicationRegisterService, [
function(applicationId, applicationInstanceId) {
AgentConfig.setApplicationId(applicationId);
AgentConfig.setApplicationInstanceId(applicationInstanceId);
timer.unref();
}]);
}, 1000);
let traceSegmentSendService = this._traceSegmentSendService;
traceSegmentCache.registerConsumer(function(segmentData) {
traceSegmentSendService.sendTraceSegment.apply(traceSegmentSendService, arguments);
});
};
/*
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
module.exports = TraceSegmentSender;
const grpc = require("grpc");
const async = require("async");
const logger = require("../logger");
const TraceSegmentService = require("../network/TraceSegmentService_grpc_pb");
/**
* @param {directServers} directServers
* @constructor
* @author zhang xin
*/
function TraceSegmentSender(directServers) {
this._directServers = directServers;
this._traceSegmentServiceStub = new TraceSegmentService.TraceSegmentServiceClient(this._directServers,
grpc.credentials.createInsecure());
}
TraceSegmentSender.prototype.sendTraceSegment = function(traceSegments) {
let call = this._traceSegmentServiceStub.collect(function(error, stat) {
if (error) {
return;
}
});
/**
* @param {traceSegment} traceSegment
* @return {Function}
*/
function traceSegmentSender(traceSegment) {
return function() {
call.write(traceSegment.transform());
};
}
let traceSegmentSenders = [];
for (let i = 0; i < traceSegments.length; i++) {
traceSegmentSenders[i] = traceSegmentSender(traceSegments[i]);
}
async.parallelLimit(traceSegmentSenders, 5, function(err, result) {
if (err) {
logger.warn("trace-segment-send", "failed to send trace segment data.");
}
call.end();
});
};
......@@ -15,16 +15,38 @@
* limitations under the License.
*/
module.exports = new ApplicationRegisterService();
/**
* @param {id} id
* @param {name} name
* @constructor
* @author zhang xin
*/
function ApplicationRegisterService() {
function OfficeComponent(id, name) {
this._id = id;
this._name = name;
}
ApplicationRegisterService.prototype.registryApplication = function(applicationCode, callback) {
OfficeComponent.prototype.getId = function() {
return this._id;
};
OfficeComponent.prototype.getName = function() {
return this._name;
};
let Components = function() {
this.HTTP = new OfficeComponent(2, "HTTP");
this.MYSQL = new OfficeComponent(5, "MYSQL");
};
Components.instance = null;
Components.getInstance = function() {
if (this.instance === null) {
this.instance = new Components();
}
return this.instance;
};
exports.Components = Components.getInstance();
exports.OfficeComponent = OfficeComponent;
......@@ -150,7 +150,7 @@ ContextCarrier.prototype.getPrimaryDistributedTraceId = function() {
};
ContextCarrier.prototype.isInvalidate = function() {
return isEmpty(this._traceSegmentId) || isEmpty(this._spanId) ||
return isEmpty(this._traceSegmentId) || isIllegalSegmentId(this._traceSegmentId) || isEmpty(this._spanId) ||
isEmpty(this._parentApplicationInstanceId)
|| isEmpty(this._entryApplicationInstanceId) || isEmpty(this._peerHost) ||
isEmpty(this._entryOperationName)
......@@ -165,6 +165,14 @@ ContextCarrier.prototype.isInvalidate = function() {
function isEmpty(value) {
return value == "" || value == undefined;
}
/**
* @param {traceSegmentId} traceSegmentId
* @return {boolean}
*/
function isIllegalSegmentId(traceSegmentId) {
return traceSegmentId.split(".").length != 3;
}
};
ContextCarrier.prototype.fetchBy = function(callback) {
......@@ -176,4 +184,3 @@ ContextCarrier.prototype.pushBy = function(callback) {
let serializedTracingContext = this.serialize();
return callback(SW_HEADER_KEY, serializedTracingContext);
};
......@@ -18,12 +18,12 @@
"use strict";
module.exports = ContextManager;
let TraceContext = require("./trace-context");
let DictionaryManager = require("../dictionary/dictionary-manager");
let AgentConfig = require("../config");
let NoopSpan = require("./noop-span");
let NoopTraceContext = require("./noop-trace-context");
let debug = require("debug")("skywalking-context-manager");
const TraceContext = require("./trace-context");
const DictionaryManager = require("../dictionary/dictionary-manager");
const AgentConfig = require("../config");
const NoopSpan = require("./noop-span");
const NoopTraceContext = require("./noop-trace-context");
const logger = require("../logger");
const NOOP_TRACE_CONTEXT = new NoopTraceContext();
const NOOP_SPAN = new NoopSpan(NOOP_TRACE_CONTEXT);
......@@ -38,12 +38,12 @@ function ContextManager() {
let traceContext = NOOP_TRACE_CONTEXT;
if (!AgentConfig.getApplicationId() ||
!AgentConfig.getApplicationInstanceId()) {
debug("use the noop-span before the application has been registered.");
logger.debug("context-manager", "use the noop-span before the application has been registered.");
return traceContext.span();
}
if (typeof this._activeTraceContext == "NoopTraceContext") {
debug(
logger.debug("context-manager",
"use the noop-span because of the parent trace context is NoopTraceContext.");
return traceContext.span();
}
......@@ -110,7 +110,7 @@ ContextManager.prototype.createEntrySpan = function(
ContextManager.prototype.createExitSpan = function(
operationName, peerId, contextCarrier) {
if (!AgentConfig.getApplicationId()) {
debug("use the noop-span before the application has been registered.");
logger.debug("context-manager", "use the noop-span before the application has been registered.");
return NOOP_SPAN;
}
......
......@@ -26,9 +26,13 @@ module.exports = LogDataEntity;
*/
function LogDataEntity(timestamp, logs) {
this._timestamp = timestamp;
this._logs = logs;
this._data = logs;
}
LogDataEntity.prototype.getLogs = function() {
return this._logs;
LogDataEntity.prototype.getTimestamp = function() {
return this._timestamp;
};
LogDataEntity.prototype.getData = function() {
return this._data;
};
/*
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const TraceSegmentServiceParameters = require("../network/TraceSegmentService_pb");
/**
*
* @param {id} id
* @param {grpcData} grpcData
* @constructor
*/
function Layer(id, grpcData) {
this._id = id;
this._grpcData = grpcData;
}
Layer.prototype.getId = function() {
return this._id;
};
Layer.prototype.getGrpcData = function() {
return this._grpcData;
};
let Layers = function() {
this.DB = new Layer(1, TraceSegmentServiceParameters.SpanLayer.Database);
this.RPC_FRAMEWORK = new Layer(2, TraceSegmentServiceParameters.SpanLayer.RPCFRAMEWORK);
this.HTTP = new Layer(3, TraceSegmentServiceParameters.SpanLayer.HTTP);
this.MQ = new Layer(4, TraceSegmentServiceParameters.SpanLayer.MQ);
this.CACHE = new Layer(5, TraceSegmentServiceParameters.SpanLayer.CACHE);
};
Layers.instance = null;
Layers.getInstance = function() {
if (this.instance === null) {
this.instance = new Layers();
}
return this.instance;
};
exports.Layer = Layer;
exports.Layers = Layers.getInstance();
......@@ -20,6 +20,9 @@ module.exports = Span;
const KeyValuePair = require("./key-value-pair");
const LogDataEntity = require("./log-data-entity");
const CommonParameters = require("../network/Common_pb");
const KeyWithStringValueParameters = require("../network/KeyWithStringValue_pb");
const TraceSegmentServiceParameters = require("../network/TraceSegmentService_pb");
/**
*
......@@ -39,6 +42,9 @@ function Span(spanOptions, traceContext) {
this._startTime = undefined;
this._endTime = undefined;
this._isError = false;
this._spanLayer = undefined;
this._componentId = undefined;
this._componentName = undefined;
this._refs = [];
this._logs = [];
this._traceContext = traceContext;
......@@ -46,11 +52,11 @@ function Span(spanOptions, traceContext) {
}
Span.prototype.start = function() {
this._startTime = process.uptime();
this._startTime = new Date().getTime();
};
Span.prototype.finish = function() {
this._endTime = process.uptime() - this._startTime;
this._endTime = new Date().getTime();
};
Span.prototype.tag = function(key, value) {
......@@ -90,6 +96,18 @@ Span.prototype.isLocalSpan = function() {
return this._spanType == "LOCAL";
};
Span.prototype.component = function(component) {
if (typeof component == "OfficeComponent") {
this._componentId = component.getId();
} else {
this._componentName = component;
}
};
Span.prototype.spanLayer = function(spanLayer) {
this._spanLayer = spanLayer;
};
Span.prototype.fetchPeerInfo = function(registerCallback, unregisterCallback) {
if (this._peerHost) {
return unregisterCallback(this._peerHost);
......@@ -110,3 +128,71 @@ Span.prototype.fetchOperationNameInfo = function(
return registerCallback(this._operationId);
}
};
Span.prototype.component = function(component) {
this._componentId = component.getId();
};
Span.prototype.transform = function() {
let serializedSpan = new TraceSegmentServiceParameters.SpanObject();
serializedSpan.setSpanid(this._spanId);
serializedSpan.setParentspanid(this._parentSpanId);
serializedSpan.setStarttime(this._startTime);
serializedSpan.setEndtime(this._endTime);
this._refs.forEach(function(ref) {
serializedSpan.addRefs(ref.transform());
});
if (this._operationName) {
serializedSpan.setOperationname(this._operationName);
} else {
serializedSpan.setOperationnameid(this._operationId);
}
if (this._peerHost) {
serializedSpan.setPeer(this._peerHost);
} else if (this._peerId) {
serializedSpan.setPeerid(this._peerId);
}
if (this.isExitSpan()) {
serializedSpan.setSpantype(CommonParameters.SpanType.EXIT);
} else if (this.isEntrySpan()) {
serializedSpan.setSpantype(CommonParameters.SpanType.ENTRY);
} else {
serializedSpan.setSpantype(CommonParameters.SpanType.LOCAL);
}
serializedSpan.setIserror(this._isError);
this._tags.forEach(function(tag) {
let serializedTag = new KeyWithStringValueParameters.KeyWithStringValue();
serializedTag.setKey(tag.getKey());
serializedTag.setValue(tag.getValue());
serializedSpan.addTags(serializedTag);
});
this._logs.forEach(function(log) {
let serializedLog = new TraceSegmentServiceParameters.LogMessage();
serializedLog.setTime(log.getTimestamp());
log.getData().forEach(function(data) {
let serializedData = new KeyWithStringValueParameters.KeyWithStringValue();
serializedData.setKey(data.getKey());
serializedData.setValue(data.getValue());
serializedLog.addData(serializedData);
});
serializedSpan.addLogs(serializedLog);
});
if (this._componentName) {
serializedSpan.setComponent(this._componentName);
} else {
serializedSpan.setComponentid(this._componentId);
}
if (this._spanLayer) {
serializedSpan.setSpanlayer(this._spanLayer.getGrpcData());
}
return serializedSpan;
};
......@@ -45,6 +45,7 @@ function TraceContext(parentTraceContext, spanOptions) {
parentTraceContext.spanId() :
-1;
this._span = new Span(spanOptions, this);
this._span.start();
}
TraceContext.prototype.span = function() {
......@@ -93,7 +94,7 @@ TraceContext.prototype.inject = function(contextCarrier) {
});
entryApplicationInstanceId = AgentConfig.getApplicationInstanceId();
primaryDistributedTraceId = traceSegment.traceSegmentId();
primaryDistributedTraceId = traceSegment.traceSegmentId().encode();
});
this._span.fetchOperationNameInfo(function(operationId) {
......
/*
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
module.exports = ID;
const AgentConfig = require("../config");
const process = require("process");
/**
* @param {idParts} idParts
* @constructor
* @author zhang xin
*/
function ID(idParts) {
if (idParts) {
this._part1 = idParts.part1;
this._part2 = idParts.part2;
this._part3 = idParts.part3;
} else {
this._part1 = AgentConfig.getApplicationInstanceId();
this._part2 = process.pid *
process.ppid ? process.ppid : (((1 + Math.random()) * 0x10000) | 0)
+ (((1 + Math.random()) * 0x10000) | 0);
this._part3 = Number(process.hrtime().join(""));
}
}
ID.prototype.toString = function() {
return this._part1 + "." + this._part2 + "." + this._part3;
};
ID.prototype.encode = function() {
return this.toString();
};
ID.prototype.part1 = function() {
return this._part1;
};
ID.prototype.part2 = function() {
return this._part2;
};
ID.prototype.part3 = function() {
return this._part3;
};
......@@ -17,6 +17,9 @@
module.exports = TraceSegmentRef;
const ID = require("./trace-segment-id");
const TraceSegmentServiceParameters = require("../network/TraceSegmentService_pb");
/**
*
* @param {contextCarrier} contextCarrier
......@@ -24,29 +27,36 @@ module.exports = TraceSegmentRef;
* @author zhang xin
*/
function TraceSegmentRef(contextCarrier) {
let that = this;
this.type = "CROSS_PROCESS";
this._traceSegmentId = contextCarrier.getTraceSegmentId();
let idSegment = contextCarrier.getTraceSegmentId().split(".");
this._traceSegmentId = new ID({
part1: idSegment[0],
part2: idSegment[1],
part3: idSegment[2],
});
this._spanId = contextCarrier.getSpanId();
this._entryApplicationInstanceId = contextCarrier.getEntryApplicationInstanceId();
this._parentApplicationInstanceId = contextCarrier.getParentApplicationInstanceId();
contextCarrier.fetchPeerHostInfo(function(peerHost) {
this._peerHost = peerHost;
}, function(peerId) {
this._peerId = peerId;
contextCarrier.fetchPeerHostInfo(function(peerId) {
that._peerId = peerId;
}, function(peerHost) {
that._peerHost = peerHost;
});
contextCarrier.fetchEntryOperationNameInfo(function(operationName) {
this._entryOperationName = operationName;
}, function(operationId) {
this._entryOperationId = operationId;
contextCarrier.fetchEntryOperationNameInfo(function(operationId) {
that._entryOperationId = operationId;
}, function(operationName) {
that._entryOperationName = operationName;
});
contextCarrier.fetchParentOperationNameInfo(function(parentOperationName) {
this._parentOperationName = parentOperationName;
}, function(parentOperationId) {
this._parentOperationId = parentOperationId;
contextCarrier.fetchParentOperationNameInfo(function(parentOperationId) {
that._parentOperationId = parentOperationId;
}, function(parentOperationName) {
that._parentOperationName = parentOperationName;
});
}
......@@ -68,3 +78,47 @@ TraceSegmentRef.prototype.fetchEntryOperationNameInfo = function(
TraceSegmentRef.prototype.getEntryApplicationInstanceId = function() {
return this._entryApplicationInstanceId;
};
TraceSegmentRef.prototype.transform = function() {
let serializedTraceSegmentRef = new TraceSegmentServiceParameters.TraceSegmentReference();
serializedTraceSegmentRef.setReftype(TraceSegmentServiceParameters.RefType.CROSSPROCESS);
/**
* @param {traceSegmentId} traceSegmentId
* @return {TraceSegmentServiceParameters.UniqueId}
*/
function buildUniqueId(traceSegmentId) {
let serializedUniqueId = new TraceSegmentServiceParameters.UniqueId();
serializedUniqueId.addIdparts(traceSegmentId.part1());
serializedUniqueId.addIdparts(traceSegmentId.part2());
serializedUniqueId.addIdparts(traceSegmentId.part3());
return serializedUniqueId;
}
serializedTraceSegmentRef.setParenttracesegmentid(buildUniqueId(this._traceSegmentId));
serializedTraceSegmentRef.setParentspanid(this._spanId);
serializedTraceSegmentRef.setParentapplicationinstanceid(this._parentApplicationInstanceId);
if (this._peerHost) {
serializedTraceSegmentRef.setNetworkaddress(this._peerHost);
} else {
serializedTraceSegmentRef.setNetworkaddressid(this._peerId);
}
serializedTraceSegmentRef.setEntryapplicationinstanceid(this._entryApplicationInstanceId);
if (this._entryOperationName) {
serializedTraceSegmentRef.setEntryservicename(this._entryOperationName);
} else {
serializedTraceSegmentRef.setEntryserviceid(this._entryOperationId);
}
if (this._parentOperationName) {
serializedTraceSegmentRef.setParentservicename(this._parentOperationName);
} else {
serializedTraceSegmentRef.setParentserviceid(this._parentOperationId);
}
return serializedTraceSegmentRef;
};
......@@ -19,12 +19,16 @@
module.exports = TraceSegment;
const ID = require("./trace-segment-id");
const AgentConfig = require("../config");
const traceSegmentCache = require("../cache");
const TraceSegmentServiceParameters = require("../network/TraceSegmentService_pb");
/**
* @author zhang xin
*/
function TraceSegment() {
this._traceSegmentId = (((1 + Math.random()) * 0x10000) | 0).toString(16).
substring(1);
this._traceSegmentId = new ID();
this._finishedSpan = [];
this._runningSpanSize = 0;
this._spanIdGenerator = 0;
......@@ -38,13 +42,15 @@ TraceSegment.prototype.traceSegmentId = function() {
};
TraceSegment.prototype.archive = function(span) {
this._finishedSpan.push(span);
if ((--this._runningSpanSize) == 0) {
this.finish();
}
this._finishedSpan.push(span);
};
TraceSegment.prototype.finish = function() {
traceSegmentCache.put(this);
};
TraceSegment.prototype.generateSpanId = function(spanOptions, callback) {
......@@ -79,3 +85,39 @@ TraceSegment.prototype.fetchEntryOperationNameInfo = function(
return registerCallback(this._entryOperationId);
}
};
TraceSegment.prototype.transform = function() {
let serializedSegment = new TraceSegmentServiceParameters.UpstreamSegment();
let serializeTraceSegmentObject = new TraceSegmentServiceParameters.TraceSegmentObject();
/**
* @param {traceSegmentId} traceSegmentId
* @return {TraceSegmentServiceParameters.UniqueId}
*/
function buildTraceSegmentId(traceSegmentId) {
let serializeTraceSegmentId = new TraceSegmentServiceParameters.UniqueId();
serializeTraceSegmentId.addIdparts(traceSegmentId.part1());
serializeTraceSegmentId.addIdparts(traceSegmentId.part2());
serializeTraceSegmentId.addIdparts(traceSegmentId.part3());
return serializeTraceSegmentId;
}
serializeTraceSegmentObject.setTracesegmentid(buildTraceSegmentId(this._traceSegmentId));
this._finishedSpan.forEach(function(span) {
serializeTraceSegmentObject.addSpans(span.transform());
});
serializeTraceSegmentObject.setApplicationid(AgentConfig.getApplicationId());
serializeTraceSegmentObject.setApplicationinstanceid(AgentConfig.getApplicationInstanceId());
serializeTraceSegmentObject.setIssizelimited(false);
if (this._refs.length > 0) {
this._refs.forEach(function(ref) {
serializedSegment.addGlobaltraceids(buildTraceSegmentId(ref.getTraceSegmentId()));
});
} else {
serializedSegment.addGlobaltraceids(buildTraceSegmentId(this._traceSegmentId));
}
serializedSegment.setSegment(serializeTraceSegmentObject.serializeBinary());
return serializedSegment;
};
......@@ -32,9 +32,12 @@
},
"homepage": "https://github.com/OpenSkywalking/skywalking-nodejs#readme",
"dependencies": {
"async": "^2.6.0",
"debug": "^3.1.0",
"end-of-stream": "^1.4.1",
"grpc": "^1.10.1"
"google-protobuf": "^3.5.0",
"grpc": "^1.10.1",
"uuid": "^3.2.1"
},
"devDependencies": {
"eslint": "^4.19.1",
......@@ -46,5 +49,7 @@
"run-script-os": "^1.0.3",
"semver": "^5.5.0"
},
"eslintIgnore": ["lib/network/*.js"]
"eslintIgnore": [
"lib/network/*.js"
]
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册