未验证 提交 cf0a696e 编写于 作者: Y youyong205 提交者: GitHub

Merge pull request #1315 from stdrickforce/nodejs

nodecat initialization
...@@ -38,7 +38,7 @@ endmacro() ...@@ -38,7 +38,7 @@ endmacro()
use_c99() use_c99()
use_cxx11() use_cxx11()
set(BUILD_TYPE CPP) # set(BUILD_TYPE CPP)
# set(BUILD_TEST 1) # set(BUILD_TEST 1)
# set(BUILD_SCRIPT 1) # set(BUILD_SCRIPT 1)
......
...@@ -74,17 +74,8 @@ void test1() { ...@@ -74,17 +74,8 @@ void test1() {
} }
void test2() { void test2() {
for (int i = 0; i < 1000; i++) { CatTransaction *t1 = newTransaction("Test2", "A");
CatTransaction *t1 = newTransaction("Test2", "A"); t1->complete(t1);
CatTransaction *t2 = newTransaction("Test2", "B");
t2->setStatus(t2, CAT_SUCCESS);
t2->addData(t2, "Body");
t2->complete(t2);
t1->setStatus(t1, CAT_SUCCESS);
t1->complete(t1);
}
} }
void test3() { void test3() {
...@@ -100,9 +91,9 @@ int main(int argc, char **argv) { ...@@ -100,9 +91,9 @@ int main(int argc, char **argv) {
CatClientConfig config = DEFAULT_CCAT_CONFIG; CatClientConfig config = DEFAULT_CCAT_CONFIG;
config.enableHeartbeat = 0; config.enableHeartbeat = 0;
config.enableDebugLog = 1; config.enableDebugLog = 1;
catClientInitWithConfig("ccat", &config); catClientInitWithConfig("nodecat", &config);
test(); test2();
Sleep(3000); Sleep(5000);
catClientDestroy(); catClientDestroy();
return 0; return 0;
} }
...@@ -106,7 +106,7 @@ int sendRootMessage(CatMessageTree *tree) { ...@@ -106,7 +106,7 @@ int sendRootMessage(CatMessageTree *tree) {
if (!tree->canDiscard) { if (!tree->canDiscard) {
return mqOffer(tree); return mqOffer(tree);
} else if (g_config.enableSampling && !hitSample()) { } else if (g_config.enableSampling && hitSample()) {
return mqOffer(tree); return mqOffer(tree);
} else { } else {
sendToAggregator(tree); sendToAggregator(tree);
......
project(nodecat)
# This CMakeLists.txt is only for development.
# Please input following commands to build a runnable version:
# node-gyp configure
# node-gyp build
cmake_minimum_required(VERSION 2.4)
macro(use_cxx11)
include(CheckCXXCompilerFlag)
CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
if(COMPILER_SUPPORTS_CXX11)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
elseif(COMPILER_SUPPORTS_CXX0X)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x")
else()
message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.")
endif()
endmacro(use_cxx11)
use_cxx11()
if (APPLE)
set(CMAKE_MACOSX_RPATH 1)
endif()
include(NodeJS.cmake)
nodejs_init()
include_directories(include)
set(
SOURCE_FILES
src/nodecat.cc
)
set(
HEADER_FILES
include/client.h
src/debug.h
)
link_libraries(catclient)
add_nodejs_module(nodecat ${SOURCE_FILES} ${HEADER_FILES})
\ No newline at end of file
build:
node-gyp configure
node-gyp build
此差异已折叠。
# Cat client for Node.js
`nodecat` supports node v8+.
## Requirements
The `nodecat` required `libcatclient.so` installed in `LD_LIBRARY_PATH`.
Please refer to [ccat installation](../c/README.md) for further information.
## Installation
### via npm
```bash
npm install nodecat
```
## Initialization
First of all, you have to create `/data/appdatas/cat` directory, read and write permission is required (0644).`/data/applogs/cat` is also required if you'd like to preserve a debug log, it can be very useful while debugging.
And create a config file `/data/appdatas/cat/client.xml` with the following contents.
```xml
<?xml version="1.0" encoding="utf-8"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema" xsi:noNamespaceSchemaLocation="config.xsd">
<servers>
<server ip="<cat server ip address>" port="2280" http-port="8080" />
</servers>
</config>
```
Don't forget to change the `<cat server IP address>` to your own after you copy and paste the contents.
After you've done all things above, `nodecat` can be initialized with following codes:
```js
var cat = require('nodecat')
cat.init({
appkey: 'your-appkey'
})
```
> Only English characters, numbers, underscore and dash is allowed in appkey.
## Documentation
### Transaction
```js
let t = cat.newTransaction('foo', 'bar')
setTimeout(() => t.complete(), 3000)
```
#### Transaction apis
We offered a list of APIs to modify the transaction.
* addData
* setStatus
* complete
> You can call `addData` several times, the added data will be connected by `&`.
Here is an exapmle:
```js
let t = cat.newTransaction('foo', 'bar')
t.addData("key", "val")
t.addData("context")
t.setStatus(cat.STATUS.SUCCESS)
setTimeout(() => t.complete(), 3000)
```
### Event
#### logEvent
Log an event.
```js
// Log a event with success status and empty data.
cat.logEvent("Event", "E1")
// The third parameter (status) is optional, default by "0".
// It can be any of string value.
// The event will be treated as "problem" unless the given status == cat.STATUS.SUCCESS ("0")
// which will be recorded in our problem report.
cat.logEvent("Event", "E2", cat.STATUS.FAIL)
cat.logEvent("Event", "E3", "failed")
// The fourth parameter (data) is optional, default by "".
// It can be any of string value.
cat.logEvent("Event", "E4", "failed", "some debug info")
// The fourth parameter (data) can also be an object
// In this case, the object will be dumped into json.
cat.logEvent("Event", "E5", "failed", {a: 1, b: 2})
```
#### logError
Log an error with error stack.
Error is a special event, with `type = Exception` and `name` is given by the 1st parameter.
And error stacktrace will be added to `data`, which is always useful in debugging.
```js
cat.logError('ErrorInTransaction', new Error())
```
\ No newline at end of file
{
"targets": [
{
"target_name": "nodecat",
"sources": [
"src/nodecat.cc",
],
"include_dirs": [
"include"
],
"libraries": [
"-lcatclient"
]
}
]
}
var cat = require('../lib')
cat.init({
appkey: 'nodecat'
})
for (var i = 0; i < 10; i++) {
// Log a event with success status and empty data.
cat.logEvent("Event", "E1")
// The third parameter (status) is optional, default by "0".
// It can be any of string value.
// The event will be treated as "problem" unless the given status == cat.STATUS.SUCCESS ("0")
// which will be recorded in our problem report.
cat.logEvent("Event", "E2", cat.STATUS.FAIL)
cat.logEvent("Event", "E3", "failed")
// The fourth parameter (data) is optional, default by "".
// It can be any of string value.
cat.logEvent("Event", "E4", "failed", "some debug info")
// The fourth parameter (data) can also be an object
// In this case, the object will be dumped into json.
cat.logEvent("Event", "E5", "failed", {a: 1, b: 2})
}
let cat = require('../lib')
cat.init({
appkey: 'nodecat'
})
let a = cat.newTransaction('Trans', 'A')
let b = cat.newTransaction('Trans', 'B')
let c = cat.newTransaction('Trans', 'C')
setTimeout(function() {
a.complete()
}, 1000)
setTimeout(function() {
b.complete()
}, 2000)
setTimeout(function() {
c.complete()
}, 3000)
'use strict'
let cat = require('../lib')
cat.init({
appkey: 'nodecat'
})
let t = cat.newTransaction('foo', 'bar')
cat.logEvent('EventInTransaction', 'T1')
cat.logError('ErrorInTransaction', new Error())
t.addData("key", "val")
t.addData("context")
t.setStatus(cat.STATUS.SUCCESS)
t.logEvent('childEvent', '1')
t.logError('childError', new Error())
setTimeout(() => t.complete(), 1000)
#ifndef CAT_CLIENT_C_CLIENT_H
#define CAT_CLIENT_C_CLIENT_H
#include <string.h>
#include <stdlib.h>
#include <errno.h>
/**
* Constants
*/
#define CAT_CLIENT_EXPORT
#define CAT_SUCCESS "0"
#define CAT_FAIL "-1"
#define CAT_ERROR "ERROR"
#define CAT_ENCODER_TEXT 0
#define CAT_ENCODER_BINARY 1
/**
* C Struct Definitions
*/
typedef struct _CatMessage CatMessage;
typedef struct _CatMessage CatEvent;
typedef struct _CatMessage CatMetric;
typedef struct _CatMessage CatHeartBeat;
typedef struct _CatTransaction CatTransaction;
struct _CatMessage {
/**
* Add some debug data to a message.
* It will be shown in the log view page.
* Note not all the data will be preserved. (If sampling is enabled)
*/
void (*addData)(CatMessage *message, const char *data);
/**
* Add some debug data to a message, in key-value format.
*/
void (*addKV)(CatMessage *message, const char *dataKey, const char *dataValue);
/**
* Set the status of a message
* Note that any status not equal to "0" (CAT_SUCCESS) will be treated as a "problem".
* And can be recorded in our problem report.
* A message tree which contains a "problem" message won't be sampling.
* In some cases, especially in high concurrency situations, it may cause network problems.
*/
void (*setStatus)(CatMessage *message, const char *status);
/**
* Set the created timestamp of a transaction
*/
void (*setTimestamp)(CatMessage *message, unsigned long long timestamp);
/**
* Complete the message.
* Meaningless in most cases, only transaction has to be completed manually.
*/
void (*complete)(CatMessage *message);
};
struct _CatTransaction {
/**
* Add some debug data to a transaction.
* It will be shown in the log view page.
* Note not all the data will be preserved. (If sampling is enabled)
*/
void (*addData)(CatTransaction *transaction, const char *data);
/**
* Add some debug data to a transaction, in key-value format.
*/
void (*addKV)(CatTransaction *transaction, const char *dataKey, const char *dataValue);
/**
* Set the status of a transaction
* Note that any status not equal to "0" (CAT_SUCCESS) will be treated as a "problem".
* And can be recorded in our problem report.
* A message tree which contains a "problem" transaction won't be sampling.
* In some cases, especially in high concurrency situations, it may cause network problems.
*/
void (*setStatus)(CatTransaction *transaction, const char *status);
/**
* Set the created timestamp of a transaction
*/
void (*setTimestamp)(CatTransaction *transaction, unsigned long long timestamp);
/**
* Complete the transaction
*/
void (*complete)(CatTransaction *transaction);
/**
* Add a child directly to a transaction.
* Avoid of using this api unless you really have to do so.
*/
void (*addChild)(CatTransaction *transaction, CatMessage *message);
/**
* Set the duration of a transaction.
* The duration will be calculated when the transaction has been completed.
* You can prevent it and specified the duration through this api.
*/
void (*setDurationInMillis)(CatTransaction* transaction, unsigned long long duration);
/**
* Set the duration start of a transaction.
* The duration start is same as timestamp by default.
* You can overwrite it through this api, which can influence the calculated duration.
* When a transaction is completed, the duration will be set to (currentTimestamp - durationStart)
* Note that it only works when duration has not been specified.
*/
void (*setDurationStart)(CatTransaction* transaction, unsigned long long durationStart);
};
typedef struct _CatClientConfig {
int encoderType;
int enableHeartbeat;
int enableSampling;
int enableMultiprocessing;
int enableDebugLog;
} CatClientConfig;
extern CatClientConfig DEFAULT_CCAT_CONFIG;
#ifdef __cplusplus
extern "C" {
#endif
/**
* Common Apis
*/
CAT_CLIENT_EXPORT int catClientInit(const char *appkey);
CAT_CLIENT_EXPORT int catClientInitWithConfig(const char *appkey, CatClientConfig* config);
CAT_CLIENT_EXPORT int catClientDestroy();
CAT_CLIENT_EXPORT int isCatEnabled();
/**
* Transaction Apis
*/
CAT_CLIENT_EXPORT CatTransaction *newTransaction(const char *type, const char *name);
/**
* Create a transaction with specified duration in milliseconds.
*
* This is equivalent to
*
* CatTransaction *t = newTransaction("type", "name");
* t->setDurationInMillis(t4, 1000);
* return t;
*/
CAT_CLIENT_EXPORT CatTransaction *newTransactionWithDuration(const char *type, const char *name, unsigned long long duration);
/**
* Log a transaction with specified duration in milliseconds.
*
* Due to the transaction has been auto completed,
* the duration start and the created timestamp will be turn back.
*
* This is equivalent to
*
* CatTransaction *t = newTransaction("foo", "bar2-completed");
* t->setTimestamp(t, GetTime64() - 1000);
* t->setDurationStart(t, GetTime64() - 1000);
* t->setDurationInMillis(t, 1000);
* t->complete(t);
* return;
*/
CAT_CLIENT_EXPORT void newCompletedTransactionWithDuration(const char *type, const char *name, unsigned long long duration);
/**
* Event Apis
*/
CAT_CLIENT_EXPORT void logEvent(const char *type, const char *name, const char *status, const char *data);
/**
* Log a error message.
*
* This is equivalent to
*
* logEvent("Exception", msg, CAT_ERROR, errStr);
*/
CAT_CLIENT_EXPORT void logError(const char *msg, const char *errStr);
/**
* Create a event message manually.
*
* Avoid using this api unless you really have to.
*/
CAT_CLIENT_EXPORT CatEvent *newEvent(const char *type, const char *name);
/**
* Heartbeat Apis
*/
/**
* Create a heartbeat message manually.
*
* Heartbeat is reported by cat client automatically,
* so you don't have to use this api in most cases,
* unless you want to overwrite our heartbeat message.
*
* Don't forget to disable our built-in heartbeat if you do so.
*/
CAT_CLIENT_EXPORT CatHeartBeat *newHeartBeat(const char *type, const char *name);
/**
* Metric Apis
*/
CAT_CLIENT_EXPORT void logMetricForCount(const char *name, int quantity);
CAT_CLIENT_EXPORT void logMetricForDuration(const char *name, unsigned long long duration);
/**
* MessageId Apis
*/
CAT_CLIENT_EXPORT char *createMessageId();
CAT_CLIENT_EXPORT char *createRemoteServerMessageId(const char *appkey);
CAT_CLIENT_EXPORT char *getThreadLocalMessageTreeId();
CAT_CLIENT_EXPORT char *getThreadLocalMessageTreeRootId();
CAT_CLIENT_EXPORT char *getThreadLocalMessageTreeParentId();
CAT_CLIENT_EXPORT void setThreadLocalMessageTreeId(char *messageId);
CAT_CLIENT_EXPORT void setThreadLocalMessageTreeRootId(char *messageId);
CAT_CLIENT_EXPORT void setThreadLocalMessageTreeParentId(char *messageId);
#ifdef __cplusplus
}
#endif
#endif //CAT_CLIENT_C_CLIENT_H
'use strict'
exports.implement = require('./implements/ccat')
'use strict'
const implement = require('./adapter').implement
const EventMessage = require('./message/event')
const TransactionMessage = require('./message/transaction')
const TreeManager = require('./message/org/tree-manager')
const STATUS = require('./constant').STATUS
const system = require('./system')
let isInitialized = false;
class TransactionHandler {
constructor(transactionMessage, catInstance) {
this.message = transactionMessage
this.cat = catInstance
this.treeManager = catInstance.treeManager
this.type = transactionMessage.type
this.name = transactionMessage.name
}
/**
* 设置transaction状态
* @param {string} status , STATUS
*/
setStatus(status) {
this.message.status = '' + status
}
/**
* Add data to a transaction.
* 序列化成 query的形式 &key=value
* 允许多次addData
* @param {string} key , 如果value存在则作为key,否则作为完整的data
* @param {string} [value]
*/
addData(key, value) {
let data = stringifyData(key)
if (value !== undefined) {
data = data + '=' + stringifyData(value)
}
if (this.message.data) {
this.message.data += '&' + data
} else {
this.message.data += data
}
}
/**
* end a transaction.
* @param {number} maxTime
*/
complete(maxTime) {
this.treeManager.endMessage(this.message, maxTime)
}
/**
* logEvent , 同cat.logEvent , 但确保挂在此transaction下
*/
logEvent(type, name, status, data) {
let message = this.message
if (message.isEnd) {
return this.cat.logEvent(type, name, status, data)
} else {
message.addChild(createEvent(type, name, status, data))
}
}
/**
* logError , 同cat.logError , 但确保挂在此transaction下
*/
logError(name, error) {
let message = this.message
if (message.isEnd) {
return this.cat.logError(name, error)
} else {
message.addChild(createError(name, error))
}
}
}
/**
* Class Cat
* 暴露给用户的API在这边,以这里的参数说明为准
* */
class Cat {
constructor() {
this.STATUS = STATUS
this.treeManager = new TreeManager(implement)
}
/**
* @param {object} options
* {string} options.appkey
*/
init(options) {
if (isInitialized) {
return
}
isInitialized = true
let logger = require('./logger')('index')
options = options || {}
logger.info('Cat Version : ' + require('../package').version)
let appkey = options.appkey
if (!appkey) {
logger.info('Appkey is required')
return
}
logger.info('Appkey has been set to ' + appkey)
implement.init(appkey)
system.collectStart(this)
}
/**
* @param {string} type 一级名称
* @param {string} name 二级名称
* @param {string} [status] 状态, 参见 STATUS
* @param {string} [data] 数据
*/
logEvent(type, name, status, data) {
this.treeManager.addMessage(createEvent(type, name, status, data))
}
/**
* @param {string} [name]
* @param {Error} error
*/
logError(name, error) {
this.treeManager.addMessage(createError(name, error))
}
/**
* 同CatInterface
* @param {string} type
* @param {string} name
*/
newTransaction(type, name) {
type = '' + type
name = '' + name
var message = new TransactionMessage({
type: type,
name: name
})
this.treeManager.addMessage(message)
let t = new TransactionHandler(message, this)
return t
}
}
function createEvent(type, name, status, data) {
type = '' + type
name = '' + name
data = stringifyData(data)
status = status ? ('' + status) : STATUS.SUCCESS
return new EventMessage({
type: type,
name: name,
status: status,
data: data
})
}
function createError(name, error) {
if (name instanceof Error) {
error = name
name = null
}
name = name || (error && error.name) || 'Error'
let stack = error ? error.stack : ''
let errStr = name ? (name + ' ' + stack) : stack
return createEvent('Error', name, 'ERROR', errStr)
}
function stringifyData(data) {
if (data === undefined) {
data = ''
}
if (typeof data !== 'string') {
// 有data参数,但不是字符串
try {
data = JSON.stringify(data)
} catch (e) {
data = data.toString ? data.toString() : ''
}
}
return data
}
module.exports = Cat
'use strict'
const os = require('os')
const logger = require('./logger.js')('config')
const getLocalIP = () => {
var ip = process.env.HOST_IP
if (!ip) {
var interfaces = os.networkInterfaces()
var addresses = []
for (var k in interfaces) {
for (var k2 in interfaces[k]) {
var address = interfaces[k][k2]
if (address.family === 'IPv4' && !address.internal) {
addresses.push(address.address)
}
}
}
addresses.length && (ip = addresses[0])
}
logger.info('Get local ip ' + ip)
return ip
}
// exports
var config = {
maxMessageLength: 2000,
hostname: os.hostname(),
domain: 'node-cat',
ip: getLocalIP()
}
module.exports = function () {
return config
}
module.exports.setDomain = function (domain) {
config.domain = domain
}
'use strict'
exports.STATUS = {
SUCCESS: '0',
FAIL: '1'
}
'use strict'
const CCatApi = require('../../build/Release/nodecat')
const Event = require('../message/event')
exports.init = (appKey) => {
CCatApi.init(appKey)
process.on('exit', (code) => {
try {
CCatApi.destroy()
} catch (e) {
}
})
}
function countTree(message) {
let count = 1
message.children.forEach(child => (count += countTree(child)))
return count
}
function position(count) {
const array = [200, 500, 1000]
for (let i = 0; i < array.length; i++) {
if (count < array[i]) {
return '<' + array[i]
}
}
return '>' + array[array.length - 1]
}
exports.sendTree = (tree) => {
if (tree.root.messageType === 'transaction') {
let count = countTree(tree.root)
let event = new Event({
type: 'TreeCount',
name: position(count),
data: '' + count,
status: '0'
})
tree.root.addChild(event)
}
CCatApi.sendTree(tree)
}
const Cat = require('./cat')
/**
* exports 一个默认Cat的实例 , 方便直接使用
*/
module.exports = new Cat()
module.exports.Cat = Cat
module.exports = function (filename) {
return {
error: function (msg) {
console.log(msg);
},
info: function (msg) {
console.log(msg);
},
warn: function (msg) {
console.log(msg);
}
}
}
\ No newline at end of file
'use strict'
var Message = require('./message')
/**
* Get an instance of an event
* CatEvent is inherited from the Message.
*
* @param {object} event options.
*/
class CatEvent extends Message {
constructor(options) {
super(options)
this.messageType = 'event'
this.begin()
this.end()
}
}
module.exports = CatEvent
'use strict'
var Message = require('./message')
var config = require('../config')()
class Heartbeat extends Message {
constructor(options) {
super(options)
this.type = 'Heartbeat'
this.name = config.ip
this.status = '0'
this.messageType = 'heartbeat'
this.begin()
this.end()
}
}
module.exports = Heartbeat
'use strict'
var HOUR = 3600 * 1000
var cluster = require('cluster')
var config = require('../config')()
var assert = require('assert')
var os = require('os')
var cpuCount = os.cpus().length
var seq = initialSeq()
var hourTS
var defaultIpHex
if (config.ip) {
var ips = config.ip.split('.')
assert.equal(ips.length, 4, 'ip must contains 4 groups')
var buffer = new Buffer(4)
for (var i = 0; i < 4; ++i) {
buffer.writeUInt8(parseInt(ips[i]), i)
}
defaultIpHex = ''
for (var j = 0; j < buffer.length; j++) {
var b = buffer.readUInt8(j)
defaultIpHex += ((b >> 4) & 0x0f).toString(16)
defaultIpHex += (b & 0x0f).toString(16)
}
}
module.exports.nextId = function (domain) {
var ts = Math.floor(Date.now() / HOUR)
if (ts != hourTS) {
seq = initialSeq()
hourTS = ts
}
seq += cpuCount
// first character is clusterId, will be different between cluster processes
return [domain || config.domain, defaultIpHex, ts, '' + seq].join('-')
}
module.exports.MessageId = MessageId
function MessageId(domain, hexIp, timestamp, index) {
this.domain = domain
this.hexIp = hexIp
this.timestamp = timestamp
this.index = index
}
module.exports.parse = function (messageId) {
if (!messageId) return null
var list = messageId.split('-')
var len = list.length
if (len >= 4) {
var ipAddressInHex = list[len - 3]
var timestamp = parseInt(list[len - 2])
var index = parseInt(list[len - 1])
var domain = list.splice(0, len - 3).join('-')
return new MessageId(domain, ipAddressInHex, timestamp, index)
}
return null
}
module.exports.getIpAddress = function () {
var local = this.hexIp
var i = []
for (var i = 0, len = local.length; i < len; i += 2) {
var first = local.charAt(i)
var next = local.charAt(i + 1)
var temp = 0
if (first >= '0' && first <= '9') {
temp += (first - '0') << 4
} else {
temp += ((first - 'a') + 10) << 4
}
if (next >= '0' && next <= '9') {
temp += next - '0'
} else {
temp += (next - 'a') + 10
}
i.push(temp)
}
return i.join('.')
}
function initialSeq() {
let seq = 0
if (process.env && process.env.pm_id) {
seq = process.env.pm_id % cpuCount
} else {
if (cluster.isWorker) {
seq = cluster.worker.id % cpuCount
}
}
return seq
}
'use strict'
/**
* Get an instance of a basic message.
*
* @param {object} options Message options for initialization.
*/
class Message {
constructor(options) {
options = options || {}
this.type = options.type || undefined
this.name = options.name || undefined
this.status = options.status || '0'
this.beginTime = options.beginTime || new Date()
this.beginTimestamp = +this.beginTime
this.endTime = options.endTime || new Date()
this.endTimestamp = +this.endTime
this.data = options.data || ''
this.children = options.children || []
this.parent = null
// this.uid = options.uid || undefined;
// this.uid = rand.generate();
this.isBegin = options.isBegin || false
this.isEnd = options.isEnd || false
this.allEnd = false
this.tree = null // 如果作为tree的根节点,这个属性作为索引
// this.puid = options.puid || undefined;
this.messageType = 'message' // 子类复写
}
addOptions(options) {
Object.keys(options).forEach(prop => {
if (prop === 'data') {
if (this[prop] === undefined || this[prop] === null) {
this[prop] = options[prop]
} else {
this[prop] = this[prop] + options[prop]
}
} else {
this[prop] = options[prop]
}
})
}
begin() {
if (this.isBegin) {
return
}
this.isBegin = true
this.beginTime = new Date()
this.beginTimestamp = +this.beginTime
}
end(maxTime) {
if (this.isEnd) {
return
}
this.isEnd = true
let now = new Date()
if (maxTime) {
if (now - this.beginTime > maxTime) {
this.endTime = new Date(+this.beginTime + maxTime)
} else {
this.endTime = now
}
} else {
this.endTime = now
}
this.endTimestamp = +this.endTime
}
addChild(message) {
var self = this
Array.prototype.forEach.call(arguments, message => {
self.children.push(message)
message.parent = self
// 如果当前的已经结束,但是加进来的节点没有end
if (self.allEnd && !message.isAllEnd()) {
self.allEnd = false
}
})
}
removeChild() {
var children = this.children
Array.prototype.forEach.call(arguments, message => {
var index = children.indexOf(message)
if (index > -1) {
children.splice(index, 1)
message.parent = null
}
})
this.isAllEnd()
}
/**
* 是否自己和子节点全部都已经End
*/
isAllEnd() {
if (this.allEnd) {
return true
}
if (!this.children.length) {
return this.allEnd = this.isEnd
}
this.allEnd = (this.isEnd && this.children.every(child => child.isAllEnd()))
return this.allEnd
}
}
module.exports = Message
'use strict'
var Tree = require('./tree')
var Event = require('../event')
var Transaction = require('../transaction')
var Heartbeat = require('../heartbeat')
class TreeManager {
constructor(sender) {
// 会出现多个tree的情况
this.trees = []
// 最近一个挂上去的transaction message
this.lastNode = null
this.sender = sender
}
/**
* 添加一个message到tree中,
* 如果是transaction , 构建树结构
* 如果是Event,挂到transaction下面或者直接发送
*/
addMessage(message) {
let lastNode = this._findLastNode()
// Transaction
if (message instanceof Transaction) {
if (!lastNode) {
// 没有树或者已经发送掉了
this.createTree(message)
this.lastNode = message
} else {
// 已经构建过 tree 了,加入到最后一个transaction的子节点
lastNode.addChild(message)
this.lastNode = message
}
message.begin()
} else if (message instanceof Event || message instanceof Heartbeat) {
// Event or Heartbeat
if (!lastNode) {
// 没有构建过树的时候,直接把消息发出去
this.sendTree(new Tree({
root: message
}))
} else {
lastNode.addChild(message)
}
}
}
/**
* 某个transaction结束
* 如果是叶子节点:
* 修改状态
* 通知父节点
* 如果是父节点:
* 判断子节点的transaction是否end,
* 如果没结束,说明add的时候挂的节点是不对的,需要修改树结构
* 如果全都结束,通知父节点
*/
endMessage(message, maxTime) {
// 先end自己
message.end(maxTime)
if (!(message instanceof Transaction)) {
return
}
if (message.isAllEnd()) {
// 如果整个子节点都结束了
this.notifyParentEnd(message)
} else {
// 如果自己结束了,但是子节点没结束,说明子节点中有挂的不对的,不应该挂在自己下面,提到自己并列
let unEndChildren = message.children.filter(child => !child.isAllEnd())
message.removeChild.apply(message, unEndChildren)
if (message.parent) {
message.parent.addChild.apply(message.parent, unEndChildren)
} else {
// 根节点自己结束了,但是有子节点没结束的,为子节点单独创建树
this.sendTree(message.tree)
unEndChildren.forEach(msg => {
this.createTree(msg)
})
}
}
}
notifyParentEnd(message) {
if (message.parent) {
if (message.parent.isEnd) {
// 如果父节点自己已经结束,再end一次,让父节点判断是否全都结束
this.endMessage(message.parent)
} else {
// 什么都不干,等父节点end
}
} else {
// 自己就是根节点
this.sendTree(message.tree)
}
}
sendTree(tree) {
if (!tree) {
return
}
var index = this.trees.indexOf(tree)
if (index > -1) {
this.trees.splice(index, 1)
}
this.sender.sendTree(tree)
}
// 找到最后一个节点,如果this.lastNode是end的,就一直往父节点找
_findLastNode() {
if (!this.lastNode) {
return null
}
var last = this.lastNode
while (last && last.isEnd) {
last = last.parent
}
return last
}
createTree(rootMessage) {
var tree = new Tree({
root: rootMessage
})
this.trees.push(tree)
return tree
}
}
module.exports = TreeManager
'use strict'
var config = require('../../config')()
/**
* Get an instance of a message tree.
*
* @param {object} options Tree options for initialization.
*/
class Tree {
constructor(options) {
this.domain = options.domain || config.domain
this.hostName = options.hostName || config.hostname
this.ip = options.ip || config.ip
this.groupName = options.groupName || config.groupName
this.clusterId = options.clusterId || config.clusterId
this.clusterName = options.clusterName || config.clusterName
this.messageId = options.messageId // set later
this.parentMessageId = options.parentMessageId || this.messageId
this.rootMessageId = options.rootMessageId || this.messageId
this.sessionToken = options.sessionToken || config.sessionToken
this.root = options.root || undefined
if (this.root) {
this.root.tree = this
}
}
}
module.exports = Tree
'use strict'
var Message = require('./message')
/**
* Get an instance of a transaction.
* Transaction is inherited from the message.
*
* @param {object} options.
*/
class Transaction extends Message {
constructor(options) {
super(options)
this.messageType = 'transaction'
}
}
module.exports = Transaction
'use strict'
var moment = require('moment')
function date2str(date) {
return moment(date).format('YYYY-MM-DD HH:mm:ss.SSS')
};
function durationInMillis(start, end) {
return (end - start)
};
function durationInMicros(start, end) {
return (end - start) * 1000
};
module.exports = {
date2str: date2str,
durationInMillis: durationInMillis,
durationInMicros: durationInMicros
}
/**
* user child process to get disk space
* */
'use strict'
const exec = require('../util/shell').exec
const fs = require('fs')
if (process.platform === 'linux' ||
process.platform === 'freebsd' ||
process.platform === 'darwin' ||
process.platform === 'sunos') {
exports.usage = function* (drive) {
if (!fs.existsSync(drive)) {
return null
}
try {
var res = yield exec("df -k '" + drive.replace(/'/g, "'\\''") + "'")
var lines = res.trim().split('\n')
var strDiskInfo = lines[lines.length - 1].replace(/[\s\n\r]+/g, ' ')
var diskInfo = strDiskInfo.split(' ')
return {
available: diskInfo[3] * 1024,
total: diskInfo[1] * 1024,
free: diskInfo[3] * 1024
}
} catch (e) {
return null
}
}
} else {
exports.usage = function* () {
return null
}
}
/**
* 获取系统的memory swap buffer/cache 使用情况
* 单位为byte
* */
'use strict'
var exec = require('../util/shell').exec
var os = require('os')
const DEFAULT_RESULT = {
totalMem: 0,
freeMem: 0,
totalSwap: 0,
freeSwap: 0,
totalCache: 0,
freeCache: 0
}
/**
* return Object
* {
* totalMem: 1,
* freeMem:1,
* totalSwap:1,
* freeSwap:1,
* totalCache: 0,
* freeCache: 0
* }
* */
exports.usage = function* () {
function split(line) {
return line.split(/[\s\n\r]+/)
}
if (process.platform === 'linux' ||
process.platform === 'freebsd' ||
process.platform === 'sunos') {
try {
var res = yield exec('free -m')
var lines = res.trim().split('\n')
var usage = Object.assign({}, DEFAULT_RESULT)
var mem = split(lines[1])
usage.totalMem = mem[1] * 1024 * 1024
usage.freeMem = mem[3] * 1024 * 1024
var swap = split(lines[3])
usage.totalSwap = swap[1] * 1024 * 1024
usage.freeSwap = swap[3] * 1024 * 1024
var cache = split(lines[2])
usage.totalCache = (+cache[2] + cache[2]) * 1024 * 1024
usage.freeCache = cache[3] * 1024 * 1024
return usage
} catch (e) {
// 不支持free 命令
return {
totalMem: os.totalmem(),
freeMem: os.freemem(),
totalSwap: 0,
freeSwap: 0,
totalCache: 0,
freeCache: 0
}
}
} else if (process.platform === 'darwin') {
// mac
return {
totalMem: os.totalmem(),
freeMem: os.freemem(),
totalSwap: 0,
freeSwap: 0,
totalCache: 0,
freeCache: 0
}
} else {
return {}
}
}
'use strict'
/**
* Base Class for System info
* */
class SystemBaseInfo {
constructor(name, properties, content) {
this.name = name
this.children = []
this.attrs = properties || {}
this.content = content
}
toString() {
// to xml item
var tag = '<' + this.name
var attrKeys = Object.keys(this.attrs)
if (attrKeys.length) {
tag += (' ' + attrKeys.map(key => key + '=' + '"' + this.attrs[key] + '"').join(' '))
}
if (this.children.length) {
tag += '>\n'
this.children.forEach(child => {
tag += child.toString()
})
tag += ('</' + this.name + '>\n')
} else if (this.content) {
tag += ('>' + this.content + '</' + this.name + '>\n')
} else {
tag += '/>\n'
}
return tag
}
addChild(childInfo) {
if (childInfo && childInfo instanceof SystemBaseInfo) {
this.children.push(childInfo)
}
}
}
module.exports = SystemBaseInfo
'use strict'
const os = require('os')
const SystemInfo = require('./SystemBaseInfo')
const Time = require('../message/util/time')
const Disk = require('./Disk')
const Mem = require('./Memory')
var userName = ''
if (os.userInfo) {
userName = os.userInfo().username
} else {
userName = require('child_process').execSync('whoami', {encoding: 'utf8', timeout: 1000}).replace('\n', '')
}
/**
* @return {SystemInfo} base system info
*/
exports.SystemInfoCollector = function* () {
let mem = yield Mem.usage()
let rootDisk = yield Disk.usage('/')
let status = new SystemInfo('status', {
timestamp: Time.date2str(new Date())
})
// System Extension
let systemExtension = new SystemInfo('extension', {
id: 'System'
})
status.addChild(systemExtension)
systemExtension.addChild(new SystemInfo('extensionDetail', {
id: 'LoadAverage',
value: os.loadavg()[0],
}))
systemExtension.addChild(new SystemInfo('extensionDetail', {
id: 'FreePhysicalMemory',
value: mem.freeMem,
}))
systemExtension.addChild(new SystemInfo('extensionDetail', {
id: 'Cache/Buffer',
value: mem.freeCache / 1024 / 1024
}))
systemExtension.addChild(new SystemInfo('extensionDetail', {
id: 'FreeSwapSpaceSize',
value: mem.freeSwap
}))
systemExtension.addChild(new SystemInfo('extensionDetail', {
id: 'HeapUsage',
value: process.memoryUsage().heapUsed / 1024 / 1024
}))
// Disk Extension
let diskExtension = new SystemInfo('extension', {
id: 'Disk'
})
status.addChild(diskExtension)
if (rootDisk) {
diskExtension.addChild(new SystemInfo('extensionDetail', {
id: '/ Free',
value: rootDisk.free
}))
}
return status
}
'use strict'
const HeartBeat = require('./message/heartbeat')
const os = require('os')
const co = require('co')
const collector = require('./sys/collector').SystemInfoCollector
let config = require('./config')()
exports.collectStart = function (cat) {
if (process.env && process.env.pm_id) {
// in pm2 , only one worker does this collection
try {
if (process.env.pm_id % os.cpus().length !== 0) {
return
}
} catch (e) {
}
}
cat.logEvent('Reboot', '' + config.ip)
function sys() {
co(function* () {
let t = cat.newTransaction('System', 'Status')
let status = yield collector()
cat.treeManager.addMessage(new HeartBeat({
data: '<?xml version="1.0" encoding="utf-8"?>\n' + status.toString()
}))
t.setStatus(cat.STATUS.SUCCESS)
t.complete()
})
}
sys()
setInterval(sys, 60000)
}
'use strict'
const spawn = require('child_process').spawn
exports.exec = function (cmd) {
return new Promise((resolve, reject) => {
let shell = spawn('sh', ['-c', cmd])
let result = []
shell.stdout.on('data', (data) => {
result.push(data.toString())
})
shell.on('error', e => {
reject(e)
})
shell.on('close', code => {
if (code) {
reject(new Error('exec [' + cmd + '] fail'))
} else {
resolve(result.join())
}
})
})
}
此差异已折叠。
{
"name": "nodecat",
"version": "1.0.0",
"description": "Cat client for Node.js.",
"main": "index.js",
"scripts": {
"test": "mocha",
"release": "standard-version",
"build": "node-gyp configure && node-gyp build"
},
"keywords": [
"cat",
"client",
"nodejs"
],
"author": "ywang1724",
"dependencies": {
"buffer-builder": "^0.2.0",
"debug": "^2.2.0",
"mkdirp": "^0.5.1",
"moment": "^2.10.6",
"request": "^2.67.0",
"xml2js": "^0.4.15"
},
"devDependencies": {
"cz-conventional-changelog": "^2.0.0",
"eslint": "^3.19.0",
"eslint-config-standard": "^10.2.1",
"eslint-config-yayajing": "^1.0.0",
"eslint-plugin-import": "^2.3.0",
"eslint-plugin-jest": "^20.0.3",
"eslint-plugin-node": "^4.2.2",
"eslint-plugin-promise": "^3.5.0",
"eslint-plugin-standard": "^3.0.1",
"jest": "^20.0.4",
"mocha": "^2.3.4",
"node-cmake": "^2.5.1",
"standard-version": "^4.0.0"
},
"config": {
"commitizen": {
"path": "./node_modules/cz-conventional-changelog"
}
}
}
//
// Created by Terence on 2018/9/3.
//
#ifndef NODECAT_DEBUG_H
#define NODECAT_DEBUG_H
#include <iostream>
#define DEBUG_MODE 1
#ifdef DEBUG_MODE
#define debuginfo(s) std::cout << s << std::endl;
#else
#define debuginfo(s)
#endif
#endif //NODECAT_DEBUG_H
#include <node.h>
#include "client.h"
#include "debug.h"
using namespace v8;
using namespace std;
namespace catapi {
/**
* Initialize cat
* @param args
*/
void Init(const FunctionCallbackInfo<Value> &args) {
Isolate *isolate = args.GetIsolate();
if (args.Length() < 1) {
isolate->ThrowException(Exception::TypeError(String::NewFromUtf8(isolate, "Appkey is required")));
return;
}
String::Utf8Value str(args[0]->ToString());
CatClientConfig config = DEFAULT_CCAT_CONFIG;
config.enableHeartbeat = 0;
config.enableMultiprocessing = 1;
catClientInitWithConfig((const char *) (*str), &config);
}
/**
* Destroy cat
* @param args
*/
void Destroy(const FunctionCallbackInfo<Value> &args) {
catClientDestroy();
}
void InnerSendNode(Isolate *isolate, Local<Object> node) {
if (node->IsNull() || node->IsUndefined()) {
return;
}
String::Utf8Value messageType((node->Get(String::NewFromUtf8(isolate, "messageType")))->ToString());
if (strcmp(*messageType, "transaction") == 0) {
String::Utf8Value type((node->Get(String::NewFromUtf8(isolate, "type")))->ToString());
String::Utf8Value name((node->Get(String::NewFromUtf8(isolate, "name")))->ToString());
String::Utf8Value status((node->Get(String::NewFromUtf8(isolate, "status")))->ToString());
String::Utf8Value data((node->Get(String::NewFromUtf8(isolate, "data")))->ToString());
double begin = node->Get(String::NewFromUtf8(isolate, "beginTimestamp"))->NumberValue();
double end = node->Get(String::NewFromUtf8(isolate, "endTimestamp"))->NumberValue();
CatTransaction *t = newTransaction((const char *) (*type), (const char *) (*name));
t->setDurationInMillis(t, static_cast<unsigned long long int>(end - begin));
t->setTimestamp(t, static_cast<unsigned long long int>(begin));
t->setStatus(t, (const char *) (*status));
t->addData(t, (const char *) (*data));
// Iterate children recursively
Handle<Array> children = Handle<Array>::Cast(node->Get(String::NewFromUtf8(isolate, "children")));
int count = children->Length();
for (u_int32_t i = 0; i < count; i++) {
InnerSendNode(isolate, Handle<Object>::Cast(children->Get(i)));
}
t->complete(t);
} else if (strcmp(*messageType, "event") == 0) {
String::Utf8Value type((node->Get(String::NewFromUtf8(isolate, "type")))->ToString());
String::Utf8Value name((node->Get(String::NewFromUtf8(isolate, "name")))->ToString());
String::Utf8Value status((node->Get(String::NewFromUtf8(isolate, "status")))->ToString());
String::Utf8Value data((node->Get(String::NewFromUtf8(isolate, "data")))->ToString());
double begin((node->Get(String::NewFromUtf8(isolate, "beginTimestamp")))->NumberValue());
CatEvent *e = newEvent((const char *) (*type), (const char *) (*name));
e->setTimestamp(e, static_cast<unsigned long long int>(begin));
e->addData(e, (const char *) (*data));
e->setStatus(e, (const char *) (*status));
e->complete(e);
} else if (strcmp(*messageType, "heartbeat") == 0) {
String::Utf8Value type((node->Get(String::NewFromUtf8(isolate, "type")))->ToString());
String::Utf8Value name((node->Get(String::NewFromUtf8(isolate, "name")))->ToString());
String::Utf8Value data((node->Get(String::NewFromUtf8(isolate, "data")))->ToString());
CatHeartBeat *h = newHeartBeat((const char *) (*type), (const char *) (*name));
h->addData(h, (const char *) (*data));
h->setStatus(h, CAT_SUCCESS);
h->complete(h);
}
}
void SendTree(const FunctionCallbackInfo<Value> &args) {
Isolate *isolate = args.GetIsolate();
if (args.Length() == 0) {
return;
}
Handle<Object> tree = Handle<Object>::Cast(args[0]);
Handle<Object> root = Handle<Object>::Cast(tree->Get(String::NewFromUtf8(isolate, "root")));
InnerSendNode(isolate, root);
}
void exports(Local<Object> exports) {
NODE_SET_METHOD(exports, "init", Init);
NODE_SET_METHOD(exports, "destroy", Destroy);
NODE_SET_METHOD(exports, "sendTree", SendTree);
}
NODE_MODULE(nodecat, exports)
} // namespace catapi
\ No newline at end of file
...@@ -110,7 +110,7 @@ with cat.Transaction("Transaction", "T1") as t: ...@@ -110,7 +110,7 @@ with cat.Transaction("Transaction", "T1") as t:
### Transaction apis ### Transaction apis
we offered a list of APIs to modify the transaction. We offered a list of APIs to modify the transaction.
* add\_data * add\_data
* set\_status * set\_status
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册