提交 99849ec7 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

Merge branch '2.0' of https://github.com/taosdata/TDengine into 2.0

# 超级表STable:多表聚合
TDengine要求每个数据采集点单独建表,这样能极大提高数据的插入/查询性能,但是导致系统中表的数量猛增,让应用对表的维护以及聚合、统计操作难度加大。为降低应用的开发难度,TDengine引入了超级表STable (Super Table)的概念。
TDengine要求每个数据采集点单独建表。独立建表的模式能够避免写入过程中的同步加锁,因此能够极大地提升数据的插入/查询性能。但是独立建表意味着系统中表的数量与采集点的数量在同一个量级。如果采集点众多,将导致系统中表的数量也非常庞大,让应用对表的维护以及聚合、统计操作难度加大。为降低应用的开发难度,TDengine引入了超级表(Super Table, 简称为STable)的概念。
## 什么是超级表
......@@ -9,14 +9,14 @@ STable是同一类型数据采集点的抽象,是同类型采集实例的集
TDengine扩展标准SQL语法用于定义STable,使用关键词tags指定标签信息。语法如下:
```mysql
CREATE TABLE <stable_name> (<field_name> TIMESTAMP, field_name1 field_type,…) TAGS(tag_name tag_type, …)
CREATE TABLE <stable_name> (<field_name> TIMESTAMP, field_name1 field_type,…) TAGS(tag_name tag_type, …)
```
其中tag_name是标签名,tag_type是标签的数据类型。标签可以使用时间戳之外的其他TDengine支持的数据类型,标签的个数最多为6个,名字不能与系统关键词相同,也不能与其他列名相同。如:
其中tag_name是标签名,tag_type是标签的数据类型。标签可以使用时间戳之外的其他TDengine支持的数据类型,标签的个数最多为32个,名字不能与系统关键词相同,也不能与其他列名相同。如:
```mysql
create table thermometer (ts timestamp, degree float)
tags (location binary(20), type int)
CREATE TABLE thermometer (ts timestamp, degree float)
TAGS (location binary(20), type int)
```
上述SQL创建了一个名为thermometer的STable,带有标签location和标签type。
......@@ -30,7 +30,7 @@ CREATE TABLE <tb_name> USING <stb_name> TAGS (tag_value1,...)
沿用上面温度计的例子,使用超级表thermometer建立单个温度计数据表的语句如下:
```mysql
create table t1 using thermometer tags ('beijing', 10)
CREATE TABLE t1 USING thermometer TAGS ('beijing', 10)
```
上述SQL以thermometer为模板,创建了名为t1的表,这张表的Schema就是thermometer的Schema,但标签location值为'beijing',标签type值为10。
......
......@@ -67,7 +67,7 @@ TDengine内嵌支持轻量级的消息订阅与推送服务。使用系统提供
TDengine的订阅与推送服务的状态是客户端维持,TDengine服务器并不维持。因此如果应用重启,从哪个时间点开始获取最新数据,由应用决定。
订阅相关API请见 [连接器](https://www.taosdata.com/cn/documentation/connector/)
订阅相关API文档请见 [C/C++ 数据订阅接口](https://www.taosdata.com/cn/documentation/connector/#C/C++-%E6%95%B0%E6%8D%AE%E8%AE%A2%E9%98%85%E6%8E%A5%E5%8F%A3),《[TDEngine中订阅的用途和用法](https://www.taosdata.com/blog/2020/02/12/1277.html)》则以一个示例详细介绍了这些API的用法
## 缓存 (Cache)
TDengine采用时间驱动缓存管理策略(First-In-First-Out,FIFO),又称为写驱动的缓存管理机制。这种策略有别于读驱动的数据缓存模式(Least-Recent-Use,LRU),直接将最近写入的数据保存在系统的缓存中。当缓存达到临界值的时候,将最早的数据批量写入磁盘。一般意义上来说,对于物联网数据的使用,用户最为关心最近产生的数据,即当前状态。TDengine充分利用了这一特性,将最近到达的(当前状态)数据保存在缓存中。
......
......@@ -62,7 +62,7 @@ Time series data is a sequence of data points over time. Inside a table, the dat
To reduce the development complexity and improve data consistency, TDengine provides the pub/sub functionality. To publish a message, you simply insert a record into a table. Compared with popular messaging tool Kafka, you subscribe to a table or a SQL query statement, instead of a topic. Once new data points arrive, TDengine will notify the application. The process is just like Kafka.
The detailed API will be introduced in the [connectors](https://www.taosdata.com/en/documentation/connector/) section.
The API documentation is at [C/C++ subscription API](https://www.taosdata.com/en/documentation/connector/#C/C++-subscription-API) section, and you can find more information from blog article (only Chinese version at present) [The usage of subscription](https://www.taosdata.com/blog/2020/02/12/1277.html).
##Caching
TDengine allocates a fixed-size buffer in memory, the newly arrived data will be written into the buffer first. Every device or table gets one or more memory blocks. For typical IoT scenarios, the hot data shall always be newly arrived data, they are more important for timely analysis. Based on this observation, TDengine manages the cache blocks in First-In-First-Out strategy. If no enough space in the buffer, the oldest data will be saved into hard disk first, then be overwritten by newly arrived data. TDengine also guarantees every device can keep at least one block of data in the buffer.
......
......@@ -4,11 +4,11 @@ PROJECT(TDengine)
ADD_SUBDIRECTORY(os)
ADD_SUBDIRECTORY(util)
ADD_SUBDIRECTORY(rpc)
ADD_SUBDIRECTORY(client)
ADD_SUBDIRECTORY(kit)
ADD_SUBDIRECTORY(modules)
ADD_SUBDIRECTORY(sdb)
ADD_SUBDIRECTORY(mnode)
ADD_SUBDIRECTORY(dnode)
ADD_SUBDIRECTORY(vnode)
ADD_SUBDIRECTORY(connector/jdbc)
#ADD_SUBDIRECTORY(client)
#ADD_SUBDIRECTORY(kit)
#ADD_SUBDIRECTORY(plugins)
#ADD_SUBDIRECTORY(sdb)
#ADD_SUBDIRECTORY(mnode)
#ADD_SUBDIRECTORY(dnode)
#ADD_SUBDIRECTORY(vnode)
#ADD_SUBDIRECTORY(connector/jdbc)
......@@ -4,8 +4,10 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc)
INCLUDE_DIRECTORIES(jni)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
AUX_SOURCE_DIRECTORY(./src SRC)
AUX_SOURCE_DIRECTORY(src SRC)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/linux)
......
......@@ -30,6 +30,7 @@ extern "C" {
#include "taosdef.h"
#include "tsqlfunction.h"
#include "tutil.h"
#include "trpc.h"
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \
(res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows)
......@@ -324,6 +325,7 @@ typedef struct _sql_obj {
int64_t stime;
uint32_t queryId;
void * thandle;
SRpcIpSet ipSet;
void * pStream;
void * pSubscription;
char * sqlstr;
......@@ -371,12 +373,6 @@ typedef struct _sstream {
struct _sstream *prev, *next;
} SSqlStream;
typedef struct {
char numOfIps;
uint32_t ip[TSDB_MAX_MGMT_IPS];
char ipstr[TSDB_MAX_MGMT_IPS][TSDB_IPv4ADDR_LEN];
} SIpStrList;
// tscSql API
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
......@@ -461,7 +457,7 @@ extern void * tscQhandle;
extern int tscKeepConn[];
extern int tsInsertHeadSize;
extern int tscNumOfThreads;
extern SIpStrList tscMgmtIpList;
extern SRpcIpSet tscMgmtIpList;
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows);
......
......@@ -25,6 +25,7 @@
#include "tscSQLParser.h"
#include "tutil.h"
#include "tnote.h"
#include "tsched.h"
static void tscProcessFetchRow(SSchedMsg *pMsg);
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
......
......@@ -279,7 +279,7 @@ void tscKillConnection(STscObj *pObj) {
SSqlObj *pSql = pObj->sqlList;
while (pSql) {
taosStopRpcConn(pSql->thandle);
//taosStopRpcConn(pSql->thandle);
pSql = pSql->next;
}
......
......@@ -900,7 +900,7 @@ bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField) {
return false;
}
if (pTagField->type < TSDB_DATA_TYPE_BOOL && pTagField->type > TSDB_DATA_TYPE_NCHAR) {
if ((pTagField->type < TSDB_DATA_TYPE_BOOL) || (pTagField->type > TSDB_DATA_TYPE_NCHAR)) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
return false;
}
......@@ -1655,7 +1655,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
SSchema* pSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta);
// functions can not be applied to tags
if (index.columnIndex >= pMeterMetaInfo->pMeterMeta->numOfColumns) {
if ((index.columnIndex >= pMeterMetaInfo->pMeterMeta->numOfColumns) || (index.columnIndex < 0)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg6);
}
......@@ -5663,4 +5663,4 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
bool hasDefaultQueryTimeRange(SQueryInfo *pQueryInfo) {
return (pQueryInfo->stime == 0 && pQueryInfo->etime == INT64_MAX) ||
(pQueryInfo->stime == INT64_MAX && pQueryInfo->etime == 0);
}
\ No newline at end of file
}
......@@ -31,10 +31,14 @@
#define TSC_MGMT_VNODE 999
SIpStrList tscMgmtIpList;
SRpcIpSet tscMgmtIpList;
int tsMasterIndex = 0;
int tsSlaveIndex = 1;
//temp
SRpcIpSet tscMgmtIpSet;
SRpcIpSet tscDnodeIpSet;
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
......@@ -53,7 +57,7 @@ void tscPrintMgmtIp() {
tscError("invalid mgmt IP list:%d", tscMgmtIpList.numOfIps);
} else {
for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) {
tscTrace("mgmt index:%d ip:%s", i, tscMgmtIpList.ipstr[i]);
tscTrace("mgmt index:%d ip:%s", i, tscMgmtIpList.ipStr[i]);
}
}
}
......@@ -62,7 +66,7 @@ void tscSetMgmtIpListFromCluster(SIpList *pIpList) {
tscMgmtIpList.numOfIps = pIpList->numOfIps;
if (memcmp(tscMgmtIpList.ip, pIpList->ip, pIpList->numOfIps * 4) != 0) {
for (int i = 0; i < pIpList->numOfIps; ++i) {
tinet_ntoa(tscMgmtIpList.ipstr[i], pIpList->ip[i]);
tinet_ntoa(tscMgmtIpList.ipStr[i], pIpList->ip[i]);
tscMgmtIpList.ip[i] = pIpList->ip[i];
}
tscTrace("cluster mgmt IP list:");
......@@ -73,9 +77,9 @@ void tscSetMgmtIpListFromCluster(SIpList *pIpList) {
void tscSetMgmtIpListFromEdge() {
if (tscMgmtIpList.numOfIps != 2) {
tscMgmtIpList.numOfIps = 2;
strcpy(tscMgmtIpList.ipstr[0], tsMasterIp);
strcpy(tscMgmtIpList.ipStr[0], tsMasterIp);
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
strcpy(tscMgmtIpList.ipstr[1], tsMasterIp);
strcpy(tscMgmtIpList.ipStr[1], tsMasterIp);
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
tscTrace("edge mgmt IP list:");
tscPrintMgmtIp();
......@@ -168,7 +172,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
if (tscShouldFreeHeatBeat(pObj->pHb)) {
tscTrace("%p free HB object and release connection, pConn:%p", pObj, pObj->pHb->thandle);
taosCloseRpcConn(pObj->pHb->thandle);
//taosCloseRpcConn(pObj->pHb->thandle);
tscFreeSqlObj(pObj->pHb);
tscCloseTscObj(pObj);
......@@ -178,6 +182,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
tscProcessSql(pObj->pHb);
}
void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
STscObj *pTscObj = pSql->pTscObj;
if (pSql->retry < tscGetMgmtConnMaxRetryTimes()) {
......@@ -187,23 +192,24 @@ void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
if (pSql->cmd.command > TSDB_SQL_READ && pSql->index == 0) pSql->index = 1;
void *thandle = taosGetConnFromCache(tscConnCache, tscMgmtIpList.ip[pSql->index], TSC_MGMT_VNODE, pTscObj->user);
if (thandle == NULL) {
SRpcConnInit connInit;
memset(&connInit, 0, sizeof(connInit));
connInit.cid = 0;
connInit.sid = 0;
connInit.meterId = pSql->pTscObj->user;
connInit.peerId = 0;
connInit.shandle = pTscMgmtConn;
connInit.ahandle = pSql;
connInit.peerPort = tsMgmtShellPort;
connInit.spi = 1;
connInit.encrypt = 0;
connInit.secret = pSql->pTscObj->pass;
connInit.peerIp = tscMgmtIpList.ipstr[pSql->index];
thandle = taosOpenRpcConn(&connInit, pCode);
}
// if (thandle == NULL) {
// SRpcConnInit connInit;
// memset(&connInit, 0, sizeof(connInit));
// connInit.cid = 0;
// connInit.sid = 0;
// connInit.meterId = pSql->pTscObj->user;
// connInit.peerId = 0;
// connInit.shandle = pTscMgmtConn;
// connInit.ahandle = pSql;
// connInit.peerPort = tsMgmtShellPort;
// connInit.spi = 1;
// connInit.encrypt = 0;
// connInit.secret = pSql->pTscObj->pass;
//
// connInit.peerIp = tscMgmtIpList.ipstr[pSql->index];
// thandle = taosOpenRpcConn(&connInit, pCode);
// }
pSql->thandle = thandle;
pSql->ip = tscMgmtIpList.ip[pSql->index];
......@@ -267,23 +273,23 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
void *thandle =
taosGetConnFromCache(tscConnCache, pVPeersDesc[pSql->index].ip, pVPeersDesc[pSql->index].vnode, pTscObj->user);
if (thandle == NULL) {
SRpcConnInit connInit;
tinet_ntoa(ipstr, pVPeersDesc[pSql->index].ip);
memset(&connInit, 0, sizeof(connInit));
connInit.cid = vidIndex;
connInit.sid = 0;
connInit.spi = 0;
connInit.encrypt = 0;
connInit.meterId = pSql->pTscObj->user;
connInit.peerId = htonl((pVPeersDesc[pSql->index].vnode << TSDB_SHELL_VNODE_BITS));
connInit.shandle = pVnodeConn;
connInit.ahandle = pSql;
connInit.peerIp = ipstr;
connInit.peerPort = tsVnodeShellPort;
thandle = taosOpenRpcConn(&connInit, pCode);
vidIndex = (vidIndex + 1) % tscNumOfThreads;
}
// if (thandle == NULL) {
// SRpcConnInit connInit;
// tinet_ntoa(ipstr, pVPeersDesc[pSql->index].ip);
// memset(&connInit, 0, sizeof(connInit));
// connInit.cid = vidIndex;
// connInit.sid = 0;
// connInit.spi = 0;
// connInit.encrypt = 0;
// connInit.meterId = pSql->pTscObj->user;
// connInit.peerId = htonl((pVPeersDesc[pSql->index].vnode << TSDB_SHELL_VNODE_BITS));
// connInit.shandle = pVnodeConn;
// connInit.ahandle = pSql;
// connInit.peerIp = ipstr;
// connInit.peerPort = tsVnodeShellPort;
// thandle = taosOpenRpcConn(&connInit, pCode);
// vidIndex = (vidIndex + 1) % tscNumOfThreads;
// }
pSql->thandle = thandle;
pSql->ip = pVPeersDesc[pSql->index].ip;
......@@ -291,6 +297,8 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
tscTrace("%p vnode:%d ip:%p index:%d is picked up, pConn:%p", pSql, pVPeersDesc[pSql->index].vnode,
pVPeersDesc[pSql->index].ip, pSql->index, pSql->thandle);
//TODO fetch from vpeerdesc
pSql->ipSet = tscMgmtIpSet;
break;
}
......@@ -326,25 +334,29 @@ int tscSendMsgToServer(SSqlObj *pSql) {
size_t totalLen = pSql->cmd.payloadLen + tsRpcHeadSize + sizeof(STaosDigest);
// the memory will be released by taosProcessResponse, so no memory leak here
char *buf = malloc(totalLen);
if (NULL == buf) {
char *pStart = rpcMallocCont(pSql->cmd.payloadLen);
if (NULL == pStart) {
tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
memcpy(buf, pSql->cmd.payload, totalLen);
memcpy(pStart, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]);
char *pStart = taosBuildReqHeader(pSql->thandle, pSql->cmd.msgType, buf);
if (pStart) {
/*
* this SQL object may be released by other thread due to the completion of this query even before the log
* is dumped to log file. So the signature needs to be kept in a local variable.
*/
uint64_t signature = (uint64_t)pSql->signature;
if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, buf);
//if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, pStart);
int ret;
if (pSql->cmd.command < TSDB_SQL_MGMT)
ret = rpcSendRequest(pTscMgmtConn, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql);
else
ret = rpcSendRequest(pVnodeConn, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql);
int ret = taosSendMsgToPeerH(pSql->thandle, pStart, pSql->cmd.payloadLen, pSql);
if (ret >= 0) {
code = 0;
}
......
......@@ -64,15 +64,15 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
}
if (ip && ip[0]) {
tscMgmtIpList.numOfIps = 4;
strcpy(tscMgmtIpList.ipstr[0], ip);
tscMgmtIpList.numOfIps = 3;
strcpy(tscMgmtIpList.ipStr[0], ip);
tscMgmtIpList.ip[0] = inet_addr(ip);
strcpy(tscMgmtIpList.ipstr[1], ip);
tscMgmtIpList.ip[1] = inet_addr(ip);
strcpy(tscMgmtIpList.ipstr[2], tsMasterIp);
tscMgmtIpList.ip[2] = inet_addr(tsMasterIp);
strcpy(tscMgmtIpList.ipstr[3], tsSecondIp);
tscMgmtIpList.ip[3] = inet_addr(tsSecondIp);
strcpy(tscMgmtIpList.ipStr[1], tsMasterIp);
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
strcpy(tscMgmtIpList.ipStr[2], tsSecondIp);
tscMgmtIpList.ip[2] = inet_addr(tsSecondIp);
tscMgmtIpList.index = 0;
tscMgmtIpList.port = tsMgmtShellPort;
}
pObj = (STscObj *)malloc(sizeof(STscObj));
......@@ -1034,7 +1034,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
pCmd->command = TSDB_SQL_MULTI_META;
pCmd->count = 0;
int code = TSDB_CODE_INVALID_METER_ID;
int code = TSDB_CODE_INVALID_TABLE_ID;
char *str = (char *)tblNameList;
SQueryInfo *pQueryInfo = NULL;
......@@ -1070,7 +1070,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
// Check if the table name available or not
if (tscValidateName(&sToken) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_INVALID_METER_ID;
code = TSDB_CODE_INVALID_TABLE_ID;
sprintf(pCmd->payload, "table name is invalid");
return code;
}
......@@ -1080,7 +1080,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
}
if (++pCmd->count > TSDB_MULTI_METERMETA_MAX_NUM) {
code = TSDB_CODE_INVALID_METER_ID;
code = TSDB_CODE_INVALID_TABLE_ID;
sprintf(pCmd->payload, "tables over the max number");
return code;
}
......
......@@ -19,7 +19,7 @@
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
#include "tsched.h"
#include "taosmsg.h"
#include "tscUtil.h"
#include "tsclient.h"
......
......@@ -24,8 +24,9 @@
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
#include "tsched.h"
#include "tsclient.h"
// global, not configurable
void * pVnodeConn;
void * pVMeterConn;
......@@ -94,18 +95,17 @@ void taos_init_imp() {
if (tsTscEnableRecordSql != 0) {
taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note");
}
tscMgmtIpList.numOfIps = 2;
strcpy(tscMgmtIpList.ipstr[0], tsMasterIp);
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
strcpy(tscMgmtIpList.ipstr[1], tsMasterIp);
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
tscMgmtIpList.index = 0;
tscMgmtIpList.port = tsMgmtShellPort;
tscMgmtIpList.numOfIps = 1;
strcpy(tscMgmtIpList.ipStr[0], tsMasterIp);
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
if (tsSecondIp[0]) {
tscMgmtIpList.numOfIps = 3;
strcpy(tscMgmtIpList.ipstr[2], tsSecondIp);
tscMgmtIpList.ip[2] = inet_addr(tsSecondIp);
tscMgmtIpList.numOfIps = 2;
strcpy(tscMgmtIpList.ipStr[1], tsSecondIp);
tscMgmtIpList.ip[1] = inet_addr(tsSecondIp);
}
tscInitMsgs();
......@@ -132,42 +132,23 @@ void taos_init_imp() {
rpcInit.label = "TSC-vnode";
rpcInit.numOfThreads = tscNumOfThreads;
rpcInit.fp = tscProcessMsgFromServer;
rpcInit.bits = 20;
rpcInit.numOfChanns = tscNumOfThreads;
rpcInit.sessionsPerChann = tsMaxVnodeConnections / tscNumOfThreads;
rpcInit.idMgmt = TAOS_ID_FREE;
rpcInit.noFree = 0;
rpcInit.sessions = tsMaxVnodeConnections;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C();
rpcInit.qhandle = tscQhandle;
pVnodeConn = taosOpenRpc(&rpcInit);
pVnodeConn = rpcOpen(&rpcInit);
if (pVnodeConn == NULL) {
tscError("failed to init connection to vnode");
return;
}
for (int i = 0; i < tscNumOfThreads; ++i) {
int retVal = taosOpenRpcChann(pVnodeConn, i, rpcInit.sessionsPerChann);
if (0 != retVal) {
tError("TSC-vnode, failed to open rpc chann");
taosCloseRpc(pVnodeConn);
return;
}
}
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsLocalIp;
rpcInit.localPort = 0;
rpcInit.label = "TSC-mgmt";
rpcInit.numOfThreads = 1;
rpcInit.fp = tscProcessMsgFromServer;
rpcInit.bits = 20;
rpcInit.numOfChanns = 1;
rpcInit.sessionsPerChann = tsMaxMgmtConnections;
rpcInit.idMgmt = TAOS_ID_FREE;
rpcInit.noFree = 0;
rpcInit.sessions = tsMaxMgmtConnections;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C();
rpcInit.qhandle = tscQhandle;
pTscMgmtConn = taosOpenRpc(&rpcInit);
pTscMgmtConn = rpcOpen(&rpcInit);
if (pTscMgmtConn == NULL) {
tscError("failed to init connection to mgmt");
return;
......@@ -183,7 +164,7 @@ void taos_init_imp() {
if (tscCacheHandle == NULL) tscCacheHandle = taosInitDataCache(tsMaxMeterConnections / 2, tscTmr, refreshTime);
tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, taosCloseRpcConn, tscTmr, tsShellActivityTimer * 1000);
tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, NULL/*taosCloseRpcConn*/, tscTmr, tsShellActivityTimer * 1000);
initialized = 1;
tscTrace("client is initialized successfully");
......
......@@ -3992,9 +3992,9 @@
}
},
"yarn": {
"version": "1.21.1",
"resolved": "https://registry.npmjs.org/yarn/-/yarn-1.21.1.tgz",
"integrity": "sha512-dQgmJv676X/NQczpbiDtc2hsE/pppGDJAzwlRiADMTvFzYbdxPj2WO4PcNyriSt2c4jsCMpt8UFRKHUozt21GQ=="
"version": "1.22.0",
"resolved": "https://registry.npmjs.org/yarn/-/yarn-1.22.0.tgz",
"integrity": "sha512-KMHP/Jq53jZKTY9iTUt3dIVl/be6UPs2INo96+BnZHLKxYNTfwMmlgHTaMWyGZoO74RI4AIFvnWhYrXq2USJkg=="
}
}
}
......@@ -39,7 +39,7 @@
},
"dependencies": {
"lodash": "^4.17.13",
"yarn": "^1.21.1"
"yarn": "^1.22.0"
},
"homepage": "https://github.com/taosdata/TDengine/tree/develop/src/connector/grafana/tdengine"
}
......@@ -2957,7 +2957,7 @@ yargs@~3.10.0:
decamelize "^1.0.0"
window-size "0.1.0"
yarn@^1.21.1:
version "1.21.1"
resolved "https://registry.yarnpkg.com/yarn/-/yarn-1.21.1.tgz#1d5da01a9a03492dc4a5957befc1fd12da83d89c"
integrity sha512-dQgmJv676X/NQczpbiDtc2hsE/pppGDJAzwlRiADMTvFzYbdxPj2WO4PcNyriSt2c4jsCMpt8UFRKHUozt21GQ==
yarn@^1.22.0:
version "1.22.0"
resolved "https://registry.yarnpkg.com/yarn/-/yarn-1.22.0.tgz#acf82906e36bcccd1ccab1cfb73b87509667c881"
integrity sha512-KMHP/Jq53jZKTY9iTUt3dIVl/be6UPs2INo96+BnZHLKxYNTfwMmlgHTaMWyGZoO74RI4AIFvnWhYrXq2USJkg==
......@@ -48,7 +48,7 @@ public enum TSDBError {
TSDB_CODE_INVALID_VALUE(24, "invalid value"),
TSDB_CODE_REDIRECT(25, "service not available"),
TSDB_CODE_ALREADY_THERE(26, "already there"),
TSDB_CODE_INVALID_METER_ID(27, "invalid meter ID"),
TSDB_CODE_INVALID_TABLE_ID(27, "invalid meter ID"),
TSDB_CODE_INVALID_SQL(28, "invalid SQL"), // this message often comes with additional info which will vary based on the specific error situation
TSDB_CODE_NETWORK_UNAVAIL(29, "failed to connect to server"),
TSDB_CODE_INVALID_MSG_LEN(30, "invalid msg len"),
......
......@@ -22,19 +22,14 @@ extern "C" {
#include <stdint.h>
#include <stdbool.h>
#include "tsched.h"
#include "dnode.h"
int dnodeProcessCreateTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
int dnodeProcessRemoveTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
extern void *dmQhandle;
void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
void dnodeSendVpeerCfgMsg(int32_t vnode);
void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid);
extern int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType);
extern int32_t (*dnodeSendSimpleRspToMnode)(void *pConn, int32_t msgType, int32_t code);
#ifdef __cplusplus
}
#endif
......
......@@ -56,6 +56,8 @@ void dnodeAllocModules();
int32_t dnodeInitModules();
void dnodeCleanUpModules();
extern void (*dnodeStartModules)();
#ifdef __cplusplus
}
#endif
......
......@@ -22,8 +22,6 @@ extern "C" {
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
#include "dnode.h"
typedef enum {
TSDB_DNODE_RUN_STATUS_INITIALIZE,
......@@ -39,6 +37,7 @@ extern void (*dnodeParseParameterK)();
extern int32_t tsMaxQueues;
extern void ** tsRpcQhandle;
extern void *tsQueryQhandle;
extern void *tsDnodeMgmtQhandle;
int32_t dnodeInitSystem();
void dnodeCleanUpSystem();
......
......@@ -21,9 +21,10 @@ extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include "taosdef.h"
#include "taosmsg.h"
#include "tstatus.h"
/*
* Open all Vnodes in the local data directory
......@@ -38,34 +39,34 @@ int32_t dnodeCleanupVnodes();
/*
* Check if vnode already exists
*/
int32_t dnodeCheckVnodeExist(int vid);
bool dnodeCheckVnodeExist(int32_t vid);
/*
* Create vnode with specified configuration and open it
* if exist, config it
*/
//tsdb_repo_t* dnodeCreateVnode(int vid, SVnodeCfg *cfg);
void* dnodeCreateVnode(int vid, SVnodeCfg *cfg);
int32_t dnodeCreateVnode(int32_t vnode, SVPeersMsg *cfg);
/*
* Modify vnode configuration information
* Remove vnode from local repository
*/
int32_t dnodeConfigVnode(int vid, SVnodeCfg *cfg);
int32_t dnodeDropVnode(int32_t vnode);
/*
* Modify vnode replication information
* Get the vnode object that has been opened
*/
int32_t dnodeConfigVnodePeers(int vid/*, SVpeerCfgMsg *cfg*/);
//tsdb_repo_t* dnodeGetVnode(int vid);
void* dnodeGetVnode(int vid);
/*
* Remove vnode from local repository
* get the status of vnode
*/
int32_t dnodeDropVnode(int vid);
EVnodeStatus dnodeGetVnodeStatus(int32_t vnode);
/*
* Get the vnode object that has been opened
* Check if vnode already exists, and table exist in this vnode
*/
//tsdb_repo_t* dnodeGetVnode(int vid);
void* dnodeGetVnode(int vid);
bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid);
#ifdef __cplusplus
}
......
......@@ -35,45 +35,26 @@ extern "C" {
void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn));
/*
* Create noraml table with specified configuration and open it
* Create table with specified configuration and open it
* if table already exist, update its schema and tag
*/
int32_t dnodeCreateNormalTable(SCreateNormalTableMsg *table);
int32_t dnodeCreateTable(SDCreateTableMsg *table);
/*
* Create stream table with specified configuration and open it
*/
int32_t dnodeCreateStreamTable(SCreateStreamTableMsg *table);
/*
* Create child table with specified configuration and open it
*/
int32_t dnodeCreateChildTable(SCreateChildTableMsg *table);
/*
* Modify normal table configuration information
*
*/
int32_t dnodeAlterNormalTable(SCreateNormalTableMsg *table);
/*
* Modify stream table configuration information
* Remove table from local repository
*/
int32_t dnodeAlterStreamTable(SCreateStreamTableMsg *table);
int32_t dnodeDropTable(int32_t vnode, int32_t sid, uint64_t uid);
/*
* Modify child table configuration information
* Create stream
* if stream already exist, update it
*/
int32_t dnodeAlterChildTable(SCreateChildTableMsg *table);
int32_t dnodeCreateStream(SAlterStreamMsg *stream);
/*
* Remove all child tables of supertable from local repository
*/
int32_t dnodeDropSuperTable(int vid, int sid, int64_t uid);
/*
* Remove table from local repository
*/
int32_t dnodeDropTable(int vid, int sid, int64_t uid);
int32_t dnodeDropSuperTable(uint64_t stableUid);
#ifdef __cplusplus
}
......
此差异已折叠。
......@@ -17,41 +17,41 @@
#include "os.h"
#include "tlog.h"
#include "tglobalcfg.h"
#include "mnode.h"
#include "http.h"
#include "monitor.h"
#include "dnodeModule.h"
#include "dnodeSystem.h"
#include "monitorSystem.h"
#include "httpSystem.h"
#include "mgmtSystem.h"
SModule tsModule[TSDB_MOD_MAX] = {0};
uint32_t tsModuleStatus = 0;
SModule tsModule[TSDB_MOD_MAX] = {0};
uint32_t tsModuleStatus = 0;
void dnodeAllocModules() {
tsModule[TSDB_MOD_MGMT].name = "mgmt";
tsModule[TSDB_MOD_MGMT].initFp = mgmtInitSystem;
tsModule[TSDB_MOD_MGMT].cleanUpFp = mgmtCleanUpSystem;
tsModule[TSDB_MOD_MGMT].startFp = mgmtStartSystem;
tsModule[TSDB_MOD_MGMT].stopFp = mgmtStopSystem;
tsModule[TSDB_MOD_MGMT].num = tsNumOfMPeers;
tsModule[TSDB_MOD_MGMT].curNum = 0;
tsModule[TSDB_MOD_MGMT].name = "mgmt";
tsModule[TSDB_MOD_MGMT].initFp = mgmtInitSystem;
tsModule[TSDB_MOD_MGMT].cleanUpFp = mgmtCleanUpSystem;
tsModule[TSDB_MOD_MGMT].startFp = mgmtStartSystem;
tsModule[TSDB_MOD_MGMT].stopFp = mgmtStopSystem;
tsModule[TSDB_MOD_MGMT].num = tsNumOfMPeers;
tsModule[TSDB_MOD_MGMT].curNum = 0;
tsModule[TSDB_MOD_MGMT].equalVnodeNum = tsMgmtEqualVnodeNum;
tsModule[TSDB_MOD_HTTP].name = "http";
tsModule[TSDB_MOD_HTTP].initFp = httpInitSystem;
tsModule[TSDB_MOD_HTTP].cleanUpFp = httpCleanUpSystem;
tsModule[TSDB_MOD_HTTP].startFp = httpStartSystem;
tsModule[TSDB_MOD_HTTP].stopFp = httpStopSystem;
tsModule[TSDB_MOD_HTTP].num = (tsEnableHttpModule == 1) ? -1 : 0;
tsModule[TSDB_MOD_HTTP].curNum = 0;
tsModule[TSDB_MOD_HTTP].name = "http";
tsModule[TSDB_MOD_HTTP].initFp = httpInitSystem;
tsModule[TSDB_MOD_HTTP].cleanUpFp = httpCleanUpSystem;
tsModule[TSDB_MOD_HTTP].startFp = httpStartSystem;
tsModule[TSDB_MOD_HTTP].stopFp = httpStopSystem;
tsModule[TSDB_MOD_HTTP].num = (tsEnableHttpModule == 1) ? -1 : 0;
tsModule[TSDB_MOD_HTTP].curNum = 0;
tsModule[TSDB_MOD_HTTP].equalVnodeNum = 0;
tsModule[TSDB_MOD_MONITOR].name = "monitor";
tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem;
tsModule[TSDB_MOD_MONITOR].cleanUpFp = monitorCleanUpSystem;
tsModule[TSDB_MOD_MONITOR].startFp = monitorStartSystem;
tsModule[TSDB_MOD_MONITOR].stopFp = monitorStopSystem;
tsModule[TSDB_MOD_MONITOR].num = (tsEnableMonitorModule == 1) ? -1 : 0;
tsModule[TSDB_MOD_MONITOR].curNum = 0;
tsModule[TSDB_MOD_MONITOR].name = "monitor";
tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem;
tsModule[TSDB_MOD_MONITOR].cleanUpFp = monitorCleanUpSystem;
tsModule[TSDB_MOD_MONITOR].startFp = monitorStartSystem;
tsModule[TSDB_MOD_MONITOR].stopFp = monitorStopSystem;
tsModule[TSDB_MOD_MONITOR].num = (tsEnableMonitorModule == 1) ? -1 : 0;
tsModule[TSDB_MOD_MONITOR].curNum = 0;
tsModule[TSDB_MOD_MONITOR].equalVnodeNum = 0;
}
......@@ -71,7 +71,7 @@ void dnodeCleanUpModules() {
}
void dnodeProcessModuleStatus(uint32_t status) {
if (tsDnodeRunStatus) {
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
return;
}
......@@ -112,7 +112,7 @@ int32_t dnodeInitModules() {
}
}
return 0;
return TSDB_CODE_SUCCESS;
}
void dnodeStartModulesImp() {
......@@ -128,4 +128,5 @@ void dnodeStartModulesImp() {
(*tsModule[TSDB_MOD_MGMT].cleanUpFp)();
}
}
void (*dnodeStartModules)() = dnodeStartModulesImp;
......@@ -17,6 +17,7 @@
#include "os.h"
#include "taoserror.h"
#include "tlog.h"
#include "tsched.h"
#include "dnode.h"
#include "dnodeRead.h"
#include "dnodeSystem.h"
......@@ -32,14 +33,14 @@ void dnodeQueryData(SQueryMeterMsg *pQuery, void *pConn, void (*callback)(int32_
}
static void dnodeExecuteRetrieveData(SSchedMsg *pSched) {
SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *)pSched->msg;
//SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *)pSched->msg;
SDnodeRetrieveCallbackFp callback = (SDnodeRetrieveCallbackFp)pSched->thandle;
void *pConn = pSched->ahandle;
//examples
int32_t code = TSDB_CODE_INVALID_QHANDLE;
void *pQInfo = NULL; //get from pConn
(*callback)(code, NULL, pConn);
(*callback)(code, pQInfo, pConn);
//TODO build response here
......@@ -47,8 +48,8 @@ static void dnodeExecuteRetrieveData(SSchedMsg *pSched) {
}
void dnodeRetrieveData(SRetrieveMeterMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp) {
int8_t *msg = malloc(sizeof(pRetrieve));
memcpy(msg, pRetrieve, sizeof(pRetrieve));
int8_t *msg = malloc(sizeof(SRetrieveMeterMsg));
memcpy(msg, pRetrieve, sizeof(SRetrieveMeterMsg));
SSchedMsg schedMsg;
schedMsg.msg = msg;
......@@ -62,6 +63,8 @@ int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveMeterRsp *retrievalRsp) {
return 0;
}
int32_t dnodeGetRetrieveDataSize(void *pQInfo) {}
int32_t dnodeGetRetrieveDataSize(void *pQInfo) {
return 0;
}
......@@ -29,7 +29,7 @@
#include "dnodeRead.h"
#include "dnodeSystem.h"
#include "dnodeShell.h"
#include "dnodeUtil.h"
#include "dnodeVnodeMgmt.h"
#include "dnodeWrite.h"
static void dnodeProcessRetrieveRequest(int8_t *pCont, int32_t contLen, void *pConn);
......@@ -40,19 +40,19 @@ static void *tsDnodeShellServer = NULL;
static int32_t tsDnodeQueryReqNum = 0;
static int32_t tsDnodeSubmitReqNum = 0;
void dnodeProcessMsgFromShell(int32_t msgType, void *pCont, int32_t contLen, void *handle, int32_t index) {
void* dnodeProcessMsgFromShell(int8_t msgType, void *pCont, int32_t contLen, void *handle, int32_t index) {
assert(handle != NULL);
if (pCont == NULL || contLen == 0) {
dnodeFreeQInfo(handle);
dTrace("conn:%p, free query info", handle);
return;
return NULL;
}
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
rpcSendSimpleRsp(handle, TSDB_CODE_NOT_READY);
dTrace("conn:%p, query msg is ignored since dnode not running", handle);
return;
return NULL;
}
dTrace("conn:%p, msg:%s is received", handle, taosMsg[msgType]);
......@@ -66,6 +66,8 @@ void dnodeProcessMsgFromShell(int32_t msgType, void *pCont, int32_t contLen, voi
} else {
dError("conn:%p, msg:%s is not processed", handle, taosMsg[msgType]);
}
return NULL;
}
int32_t dnodeInitShell() {
......
......@@ -19,6 +19,7 @@
#include "taoserror.h"
#include "tcrc32c.h"
#include "tlog.h"
#include "tsched.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
......@@ -53,7 +54,7 @@ static int32_t dnodeInitTmrCtl();
void *tsStatusTimer = NULL;
void *vnodeTmrCtrl;
void **tsRpcQhandle;
void *dmQhandle;
void *tsDnodeMgmtQhandle;
void *tsQueryQhandle;
int32_t tsVnodePeers = TSDB_VNODES_SUPPORT - 1;
int32_t tsMaxQueues;
......@@ -298,7 +299,7 @@ static int32_t dnodeInitRpcQHandle() {
tsRpcQhandle[i] = taosInitScheduler(tsSessionsPerVnode, 1, "dnode");
}
dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt");
tsDnodeMgmtQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt");
return 0;
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dnodeUtil.h"
EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) {
return TSDB_VN_STATUS_MASTER;
}
bool dnodeCheckVnodeExist(int32_t vnode) {
return true;
}
......@@ -14,4 +14,20 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tlog.h"
#include "taoserror.h"
#include "dnodeVnodeMgmt.h"
EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) {
return TSDB_VN_STATUS_MASTER;
}
bool dnodeCheckVnodeExist(int32_t vnode) {
return true;
}
bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid) {
return true;
}
......@@ -18,6 +18,7 @@
#include "taoserror.h"
#include "tlog.h"
#include "dnodeWrite.h"
#include "dnodeVnodeMgmt.h"
void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)) {
SShellSubmitRspMsg result = {0};
......@@ -32,35 +33,40 @@ void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShe
//TODO: submit implementation
}
int32_t dnodeCreateNormalTable(SCreateNormalTableMsg *table) {
return 0;
int32_t dnodeCreateTable(SDCreateTableMsg *table) {
return TSDB_CODE_SUCCESS;
}
int32_t dnodeCreateStreamTable(SCreateStreamTableMsg *table) {
return 0;
}
int32_t dnodeCreateChildTable(SCreateChildTableMsg *table) {
return 0;
/*
* Remove table from local repository
*/
int32_t dnodeDropTable(int32_t vnode, int32_t sid, uint64_t uid) {
return TSDB_CODE_SUCCESS;
}
int32_t dnodeAlterNormalTable(SCreateNormalTableMsg *table) {
return 0;
}
/*
* Create stream
* if stream already exist, update it
*/
int32_t dnodeCreateStream(SAlterStreamMsg *stream) {
int32_t vnode = htonl(stream->vnode);
int32_t sid = htonl(stream->sid);
uint64_t uid = htobe64(stream->uid);
int32_t dnodeAlterStreamTable(SCreateStreamTableMsg *table) {
return 0;
}
if (!dnodeCheckTableExist(vnode, sid, uid)) {
return TSDB_CODE_INVALID_TABLE;
}
int32_t dnodeAlterChildTable(SCreateChildTableMsg *table) {
return 0;
}
//TODO create or remove stream
int32_t dnodeDropSuperTable(int vid, int sid, int64_t uid) {
return 0;
}
int32_t dnodeDropTable(int vid, int sid, int64_t uid) {
return 0;
/*
* Remove all child tables of supertable from local repository
*/
int32_t dnodeDropSuperTable(uint64_t stableUid) {
return TSDB_CODE_SUCCESS;
}
......@@ -22,7 +22,6 @@ extern "C" {
#include <stdint.h>
#include <pthread.h>
#include "tsched.h"
typedef struct {
int32_t queryReqNum;
......@@ -45,15 +44,16 @@ extern uint32_t tsRebootTime;
extern void (*dnodeStartModules)();
extern void (*dnodeParseParameterK)();
extern int32_t (*dnodeCheckSystem)();
extern char *(*taosBuildRspMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size);
extern char *(*taosBuildReqMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size);
extern char *(*taosBuildRspMsgToMnode)(SMgmtObj *pObj, char type);
extern char *(*taosBuildReqMsgToMnode)(SMgmtObj *pObj, char type);
extern int (*taosSendMsgToMnode)(SMgmtObj *pObj, char *msg, int msgLen);
extern int (*taosSendSimpleRspToMnode)(SMgmtObj *pObj, char rsptype, char code);
extern void (*dnodeInitMgmtIp)();
extern int (*dnodeInitMgmt)();
// dnodeMgmt
void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
extern int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType);
extern int32_t (*dnodeSendSimpleRspToMnode)(void *pConn, int32_t msgType, int32_t code);
// dnodeModule
extern void (*dnodeStartModules)();
// multilevelStorage
extern int32_t (*dnodeInitStorage)();
......@@ -61,9 +61,6 @@ extern void (*dnodeCleanupStorage)();
void dnodeCheckDataDirOpenned(const char* dir);
void dnodeProcessMsgFromMgmtImp(SSchedMsg *sched);
void dnodeLockVnodes();
void dnodeUnLockVnodes();
SDnodeStatisInfo dnodeGetStatisInfo();
......
......@@ -20,33 +20,13 @@
extern "C" {
#endif
#include "tglobalcfg.h"
#include "tlog.h"
#define httpError(...) \
if (httpDebugFlag & DEBUG_ERROR) { \
tprintf("ERROR HTP ", 255, __VA_ARGS__); \
}
#define httpWarn(...) \
if (httpDebugFlag & DEBUG_WARN) { \
tprintf("WARN HTP ", httpDebugFlag, __VA_ARGS__); \
}
#define httpTrace(...) \
if (httpDebugFlag & DEBUG_TRACE) { \
tprintf("HTP ", httpDebugFlag, __VA_ARGS__); \
}
#define httpDump(...) \
if (httpDebugFlag & DEBUG_TRACE) { \
taosPrintLongString("HTP ", httpDebugFlag, __VA_ARGS__); \
}
#define httpPrint(...) \
{ tprintf("HTP ", 255, __VA_ARGS__); }
#define httpLError(...) taosLogError(__VA_ARGS__) httpError(__VA_ARGS__)
#define httpLWarn(...) taosLogWarn(__VA_ARGS__) httpWarn(__VA_ARGS__)
#define httpLPrint(...) taosLogPrint(__VA_ARGS__) httpPrint(__VA_ARGS__)
#include <stdint.h>
int32_t httpGetReqCount();
int32_t httpInitSystem();
int32_t httpStartSystem();
void httpStopSystem();
void httpCleanUpSystem();
#ifdef __cplusplus
}
......
......@@ -336,6 +336,14 @@ typedef struct {
} SShowObj;
//mgmtSystem
int32_t mgmtStartSystem();
void mgmtCleanUpSystem();
void mgmtProcessMsgFromDnode(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
extern int32_t (*mgmtInitSystem)();
extern void (*mgmtStopSystem)();
extern void (*mgmtCleanUpRedirect)();
#ifdef __cplusplus
}
#endif
......
......@@ -13,16 +13,22 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __MONITOR_SYSTEM_H__
#define __MONITOR_SYSTEM_H__
#ifndef TDENGINE_MONITOR_H
#define TDENGINE_MONITOR_H
#include <stdbool.h>
#ifdef __cplusplus
extern "C" {
#endif
int monitorInitSystem();
int monitorStartSystem();
#include <stdint.h>
int32_t monitorInitSystem();
int32_t monitorStartSystem();
void monitorStopSystem();
void monitorCleanUpSystem();
extern void (*mnodeCountRequestFp)(SDnodeStatisInfo *info);
#ifdef __cplusplus
}
#endif
#endif
\ No newline at end of file
#endif
......@@ -26,7 +26,7 @@ extern "C" {
#ifdef TAOS_ERROR_C
#define TAOS_DEFINE_ERROR(name, mod, code, msg) {.val = (0x80000000 | ((mod)<<16) | (code)), .str=(msg)},
#else
#define TAOS_DEFINE_ERROR(name, mod, code, msg) const int32_t name = (0x80000000 | ((mod)<<16) | (code));
#define TAOS_DEFINE_ERROR(name, mod, code, msg) static const int32_t name = (0x80000000 | ((mod)<<16) | (code));
#endif
#define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code))
......@@ -161,6 +161,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VNODE_STATUS, 0, 116, "invalid vnode s
TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_LOCK_RESOURCES, 0, 117, "failed to lock resources")
TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 118, "table id mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CACHE_ERASED, 0, 119, "query cache erased")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, 0, 120, "invalid message")
#ifdef TAOS_ERROR_C
};
......
......@@ -28,118 +28,97 @@ extern "C" {
#include "taosdef.h"
// message type
#define TSDB_MSG_TYPE_REG 1
#define TSDB_MSG_TYPE_REG_RSP 2
#define TSDB_MSG_TYPE_DNODE_SUBMIT 3
#define TSDB_MSG_TYPE_DNODE_SUBMIT_RSP 4
#define TSDB_MSG_TYPE_DNODE_QUERY 5
#define TSDB_MSG_TYPE_DNODE_QUERY_RSP 6
#define TSDB_MSG_TYPE_DNODE_RETRIEVE 7
#define TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP 8
#define TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE 9
#define TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE_RSP 10
#define TSDB_MSG_TYPE_DNODE_CREATE_NORMAL_TABLE 11
#define TSDB_MSG_TYPE_DNODE_CREATE_NORMAL_TABLE_RSP 12
#define TSDB_MSG_TYPE_DNODE_CREATE_STREAM_TABLE 13
#define TSDB_MSG_TYPE_DNODE_CREATE_STREAM_TABLE_RSP 14
#define TSDB_MSG_TYPE_DNODE_CREATE_SUPER_TABLE 15
#define TSDB_MSG_TYPE_DNODE_CREATE_SUPER_TABLE_RSP 16
#define TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE 17
#define TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE_RSP 18
#define TSDB_MSG_TYPE_DNODE_REMOVE_NORMAL_TABLE 19
#define TSDB_MSG_TYPE_DNODE_REMOVE_NORMAL_TABLE_RSP 20
#define TSDB_MSG_TYPE_DNODE_REMOVE_STREAM_TABLE 21
#define TSDB_MSG_TYPE_DNODE_REMOVE_STREAM_TABLE_RSP 22
#define TSDB_MSG_TYPE_DNODE_REMOVE_SUPER_TABLE 23
#define TSDB_MSG_TYPE_DNODE_REMOVE_SUPER_TABLE_RSP 24
#define TSDB_MSG_TYPE_DNODE_ALTER_CHILD_TABLE 25
#define TSDB_MSG_TYPE_DNODE_ALTER_CHILD_TABLE_RSP 26
#define TSDB_MSG_TYPE_DNODE_ALTER_NORMAL_TABLE 27
#define TSDB_MSG_TYPE_DNODE_ALTER_NORMAL_TABLE_RSP 28
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_TABLE 29
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_TABLE_RSP 30
#define TSDB_MSG_TYPE_DNODE_ALTER_SUPER_TABLE 31
#define TSDB_MSG_TYPE_DNODE_ALTER_SUPER_TABLE_RSP 32
#define TSDB_MSG_TYPE_DNODE_VPEERS 33
#define TSDB_MSG_TYPE_DNODE_VPEERS_RSP 34
#define TSDB_MSG_TYPE_DNODE_FREE_VNODE 35
#define TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP 36
#define TSDB_MSG_TYPE_DNODE_CFG 37
#define TSDB_MSG_TYPE_DNODE_CFG_RSP 38
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM 39
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_RSP 40
#define TSDB_MSG_TYPE_SDB_SYNC 41
#define TSDB_MSG_TYPE_SDB_SYNC_RSP 42
#define TSDB_MSG_TYPE_SDB_FORWARD 43
#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 44
#define TSDB_MSG_TYPE_CONNECT 51
#define TSDB_MSG_TYPE_CONNECT_RSP 52
#define TSDB_MSG_TYPE_CREATE_ACCT 53
#define TSDB_MSG_TYPE_CREATE_ACCT_RSP 54
#define TSDB_MSG_TYPE_ALTER_ACCT 55
#define TSDB_MSG_TYPE_ALTER_ACCT_RSP 56
#define TSDB_MSG_TYPE_DROP_ACCT 57
#define TSDB_MSG_TYPE_DROP_ACCT_RSP 58
#define TSDB_MSG_TYPE_CREATE_USER 59
#define TSDB_MSG_TYPE_CREATE_USER_RSP 60
#define TSDB_MSG_TYPE_ALTER_USER 61
#define TSDB_MSG_TYPE_ALTER_USER_RSP 62
#define TSDB_MSG_TYPE_DROP_USER 63
#define TSDB_MSG_TYPE_DROP_USER_RSP 64
#define TSDB_MSG_TYPE_CREATE_MNODE 65
#define TSDB_MSG_TYPE_CREATE_MNODE_RSP 66
#define TSDB_MSG_TYPE_DROP_MNODE 67
#define TSDB_MSG_TYPE_DROP_MNODE_RSP 68
#define TSDB_MSG_TYPE_CREATE_DNODE 69
#define TSDB_MSG_TYPE_CREATE_DNODE_RSP 70
#define TSDB_MSG_TYPE_DROP_DNODE 71
#define TSDB_MSG_TYPE_DROP_DNODE_RSP 72
#define TSDB_MSG_TYPE_ALTER_DNODE 73
#define TSDB_MSG_TYPE_ALTER_DNODE_RSP 74
#define TSDB_MSG_TYPE_CREATE_DB 75
#define TSDB_MSG_TYPE_CREATE_DB_RSP 76
#define TSDB_MSG_TYPE_DROP_DB 77
#define TSDB_MSG_TYPE_DROP_DB_RSP 78
#define TSDB_MSG_TYPE_USE_DB 79
#define TSDB_MSG_TYPE_USE_DB_RSP 80
#define TSDB_MSG_TYPE_ALTER_DB 81
#define TSDB_MSG_TYPE_ALTER_DB_RSP 82
#define TSDB_MSG_TYPE_CREATE_TABLE 83
#define TSDB_MSG_TYPE_CREATE_TABLE_RSP 84
#define TSDB_MSG_TYPE_DROP_TABLE 85
#define TSDB_MSG_TYPE_DROP_TABLE_RSP 86
#define TSDB_MSG_TYPE_ALTER_TABLE 87
#define TSDB_MSG_TYPE_ALTER_TABLE_RSP 88
#define TSDB_MSG_TYPE_VNODE_CFG 89
#define TSDB_MSG_TYPE_VNODE_CFG_RSP 90
#define TSDB_MSG_TYPE_TABLE_CFG 91
#define TSDB_MSG_TYPE_TABLE_CFG_RSP 92
#define TSDB_MSG_TYPE_TABLE_META 93
#define TSDB_MSG_TYPE_TABLE_META_RSP 94
#define TSDB_MSG_TYPE_STABLE_META 95
#define TSDB_MSG_TYPE_STABLE_META_RSP 96
#define TSDB_MSG_TYPE_MULTI_TABLE_META 97
#define TSDB_MSG_TYPE_MULTI_TABLE_META_RSP 98
#define TSDB_MSG_TYPE_ALTER_STREAM 99
#define TSDB_MSG_TYPE_ALTER_STREAM_RSP 100
#define TSDB_MSG_TYPE_SHOW 101
#define TSDB_MSG_TYPE_SHOW_RSP 102
#define TSDB_MSG_TYPE_CFG_MNODE 103
#define TSDB_MSG_TYPE_CFG_MNODE_RSP 104
#define TSDB_MSG_TYPE_KILL_QUERY 105
#define TSDB_MSG_TYPE_KILL_QUERY_RSP 106
#define TSDB_MSG_TYPE_KILL_STREAM 107
#define TSDB_MSG_TYPE_KILL_STREAM_RSP 108
#define TSDB_MSG_TYPE_KILL_CONNECTION 109
#define TSDB_MSG_TYPE_KILL_CONNECTION_RSP 110
#define TSDB_MSG_TYPE_HEARTBEAT 111
#define TSDB_MSG_TYPE_HEARTBEAT_RSP 112
#define TSDB_MSG_TYPE_STATUS 113
#define TSDB_MSG_TYPE_STATUS_RSP 114
#define TSDB_MSG_TYPE_GRANT 115
#define TSDB_MSG_TYPE_GRANT_RSP 116
#define TSDB_MSG_TYPE_MAX 117
#define TSDB_MSG_TYPE_REG 1
#define TSDB_MSG_TYPE_REG_RSP 2
#define TSDB_MSG_TYPE_DNODE_SUBMIT 3
#define TSDB_MSG_TYPE_DNODE_SUBMIT_RSP 4
#define TSDB_MSG_TYPE_DNODE_QUERY 5
#define TSDB_MSG_TYPE_DNODE_QUERY_RSP 6
#define TSDB_MSG_TYPE_DNODE_RETRIEVE 7
#define TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP 8
#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE 9
#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP 10
#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE 11
#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP 12
#define TSDB_MSG_TYPE_DNODE_VPEERS 13
#define TSDB_MSG_TYPE_DNODE_VPEERS_RSP 14
#define TSDB_MSG_TYPE_DNODE_FREE_VNODE 15
#define TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP 16
#define TSDB_MSG_TYPE_DNODE_CFG 17
#define TSDB_MSG_TYPE_DNODE_CFG_RSP 18
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM 19
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_RSP 20
#define TSDB_MSG_TYPE_SDB_SYNC 21
#define TSDB_MSG_TYPE_SDB_SYNC_RSP 22
#define TSDB_MSG_TYPE_SDB_FORWARD 23
#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 24
#define TSDB_MSG_TYPE_CONNECT 31
#define TSDB_MSG_TYPE_CONNECT_RSP 32
#define TSDB_MSG_TYPE_CREATE_ACCT 33
#define TSDB_MSG_TYPE_CREATE_ACCT_RSP 34
#define TSDB_MSG_TYPE_ALTER_ACCT 35
#define TSDB_MSG_TYPE_ALTER_ACCT_RSP 36
#define TSDB_MSG_TYPE_DROP_ACCT 37
#define TSDB_MSG_TYPE_DROP_ACCT_RSP 38
#define TSDB_MSG_TYPE_CREATE_USER 39
#define TSDB_MSG_TYPE_CREATE_USER_RSP 40
#define TSDB_MSG_TYPE_ALTER_USER 41
#define TSDB_MSG_TYPE_ALTER_USER_RSP 42
#define TSDB_MSG_TYPE_DROP_USER 43
#define TSDB_MSG_TYPE_DROP_USER_RSP 44
#define TSDB_MSG_TYPE_CREATE_MNODE 45
#define TSDB_MSG_TYPE_CREATE_MNODE_RSP 46
#define TSDB_MSG_TYPE_DROP_MNODE 47
#define TSDB_MSG_TYPE_DROP_MNODE_RSP 48
#define TSDB_MSG_TYPE_CREATE_DNODE 49
#define TSDB_MSG_TYPE_CREATE_DNODE_RSP 50
#define TSDB_MSG_TYPE_DROP_DNODE 51
#define TSDB_MSG_TYPE_DROP_DNODE_RSP 52
#define TSDB_MSG_TYPE_ALTER_DNODE 53
#define TSDB_MSG_TYPE_ALTER_DNODE_RSP 54
#define TSDB_MSG_TYPE_CREATE_DB 55
#define TSDB_MSG_TYPE_CREATE_DB_RSP 56
#define TSDB_MSG_TYPE_DROP_DB 57
#define TSDB_MSG_TYPE_DROP_DB_RSP 58
#define TSDB_MSG_TYPE_USE_DB 59
#define TSDB_MSG_TYPE_USE_DB_RSP 60
#define TSDB_MSG_TYPE_ALTER_DB 61
#define TSDB_MSG_TYPE_ALTER_DB_RSP 62
#define TSDB_MSG_TYPE_CREATE_TABLE 63
#define TSDB_MSG_TYPE_CREATE_TABLE_RSP 64
#define TSDB_MSG_TYPE_DROP_TABLE 65
#define TSDB_MSG_TYPE_DROP_TABLE_RSP 66
#define TSDB_MSG_TYPE_ALTER_TABLE 67
#define TSDB_MSG_TYPE_ALTER_TABLE_RSP 68
#define TSDB_MSG_TYPE_VNODE_CFG 69
#define TSDB_MSG_TYPE_VNODE_CFG_RSP 70
#define TSDB_MSG_TYPE_TABLE_CFG 71
#define TSDB_MSG_TYPE_TABLE_CFG_RSP 72
#define TSDB_MSG_TYPE_TABLE_META 73
#define TSDB_MSG_TYPE_TABLE_META_RSP 74
#define TSDB_MSG_TYPE_STABLE_META 75
#define TSDB_MSG_TYPE_STABLE_META_RSP 76
#define TSDB_MSG_TYPE_MULTI_TABLE_META 77
#define TSDB_MSG_TYPE_MULTI_TABLE_META_RSP 78
#define TSDB_MSG_TYPE_ALTER_STREAM 79
#define TSDB_MSG_TYPE_ALTER_STREAM_RSP 80
#define TSDB_MSG_TYPE_SHOW 81
#define TSDB_MSG_TYPE_SHOW_RSP 82
#define TSDB_MSG_TYPE_CFG_MNODE 83
#define TSDB_MSG_TYPE_CFG_MNODE_RSP 84
#define TSDB_MSG_TYPE_KILL_QUERY 85
#define TSDB_MSG_TYPE_KILL_QUERY_RSP 86
#define TSDB_MSG_TYPE_KILL_STREAM 87
#define TSDB_MSG_TYPE_KILL_STREAM_RSP 88
#define TSDB_MSG_TYPE_KILL_CONNECTION 89
#define TSDB_MSG_TYPE_KILL_CONNECTION_RSP 90
#define TSDB_MSG_TYPE_HEARTBEAT 91
#define TSDB_MSG_TYPE_HEARTBEAT_RSP 92
#define TSDB_MSG_TYPE_STATUS 93
#define TSDB_MSG_TYPE_STATUS_RSP 94
#define TSDB_MSG_TYPE_GRANT 95
#define TSDB_MSG_TYPE_GRANT_RSP 96
#define TSDB_MSG_TYPE_MAX 97
// IE type
#define TSDB_IE_TYPE_SEC 1
......@@ -313,72 +292,28 @@ typedef struct SSchema {
short bytes;
} SSchema;
typedef struct SMColumn {
typedef struct {
int8_t type;
int16_t colId;
int16_t bytes;
} SMColumn;
typedef struct {
int32_t size;
int8_t* data;
} SVariableMsg;
typedef struct {
short vnode;
int32_t sid;
uint64_t uid;
char spi;
char encrypt;
char meterId[TSDB_TABLE_ID_LEN];
char secret[TSDB_KEY_LEN];
char cipheringKey[TSDB_KEY_LEN];
uint64_t timeStamp;
uint64_t lastCreate;
short numOfColumns;
short sqlLen; // SQL string is after schema
char reserved[16];
int32_t sversion;
SMColumn schema[];
} SCreateMsg;
} SDTableColumn;
typedef struct {
int32_t vnode;
int32_t sid;
uint64_t uid;
char tableId[TSDB_TABLE_ID_LEN + 1];
char superTableId[TSDB_TABLE_ID_LEN + 1];
uint64_t createdTime;
uint64_t superTableUid;
int32_t tableType;
int32_t sversion;
int16_t numOfColumns;
int16_t numOfTags;
int32_t tagDataLen;
int8_t data[];
} SCreateChildTableMsg;
typedef struct {
int32_t vnode;
int32_t sid;
uint64_t uid;
char tableId[TSDB_TABLE_ID_LEN + 1];
int32_t sqlDataLen;
uint64_t createdTime;
int32_t sversion;
int16_t numOfColumns;
int8_t data[];
} SCreateNormalTableMsg;
typedef struct {
int32_t vnode;
int32_t sid;
uint64_t uid;
char tableId[TSDB_TABLE_ID_LEN + 1];
uint64_t createdTime;
int32_t sversion;
int16_t numOfColumns;
int32_t sqlLen;
char superTableId[TSDB_TABLE_ID_LEN + 1];
int8_t data[];
} SCreateStreamTableMsg;
} SDCreateTableMsg;
typedef struct {
char db[TSDB_TABLE_ID_LEN];
......@@ -468,10 +403,10 @@ typedef struct {
int32_t sid;
uint64_t uid;
char meterId[TSDB_TABLE_ID_LEN];
} SRemoveMeterMsg;
} SDRemoveTableMsg;
typedef struct {
short vnode;
int32_t vnode;
} SFreeVnodeMsg;
typedef struct SColIndexEx {
......@@ -753,6 +688,7 @@ typedef struct {
typedef struct {
int32_t dnode; //the ID of dnode
int32_t vnode; //the index of vnode
uint32_t ip;
} SVPeerDesc;
typedef struct {
......@@ -923,11 +859,11 @@ typedef struct {
} SKillQuery, SKillStream, SKillConnection;
typedef struct {
short vnode;
int32_t vnode;
int32_t sid;
uint64_t uid;
uint64_t stime; // stream starting time
char status;
int32_t status;
} SAlterStreamMsg;
#pragma pack(pop)
......
......@@ -19,6 +19,10 @@
extern "C" {
#endif
#include <stdbool.h>
#include <stdint.h>
#include "taosdef.h"
#define TAOS_CONN_UDPS 0
#define TAOS_CONN_UDPC 1
#define TAOS_CONN_TCPS 2
......@@ -35,8 +39,8 @@ extern "C" {
extern int tsRpcHeadSize;
typedef struct {
int8_t index;
int8_t numOfIps;
int16_t index;
int16_t numOfIps;
uint16_t port;
uint32_t ip[TSDB_MAX_MPEERS];
} SRpcIpSet;
......
......@@ -38,14 +38,11 @@ int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid);
char *taosBuildRspMsgToDnode(SDnodeObj *pObj, char type);
char *taosBuildReqMsgToDnode(SDnodeObj *pObj, char type);
extern char* (*taosBuildRspMsgToDnodeWithSize)(SDnodeObj *pObj, char type, int32_t size);
extern char* (*taosBuildReqMsgToDnodeWithSize)(SDnodeObj *pObj, char type, int32_t size);
extern int32_t (*taosSendSimpleRspToDnode)(SDnodeObj *pObj, char rsptype, char code);
extern int32_t (*taosSendMsgToDnode)(SDnodeObj *pObj, char *msg, int32_t msgLen);
extern int32_t (*mgmtSendSimpleRspToDnode)(int32_t msgType, int32_t code);
extern int32_t (*mgmtSendMsgToDnode)(SDnodeObj *pObj, char *msg, int msgLen);
extern int32_t (*mgmtInitDnodeInt)();
extern void (*mgmtCleanUpDnodeInt)();
extern void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId);
extern void (*mgmtProcessMsgFromDnodeSpec)(SSchedMsg *sched);
#ifdef __cplusplus
......
......@@ -50,7 +50,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
SVgObj * pVgroup;
if (!sdbMaster) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_REDIRECT);
mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_REDIRECT);
return 0;
}
......@@ -59,7 +59,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
pStart = taosBuildRspMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, 64000);
if (pStart == NULL) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0;
}
......@@ -78,7 +78,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
pMsg = mgmtBuildCreateMeterIe(pTable, pMsg, vnode);
} else {
mTrace("dnode:%s, vnode:%d sid:%d, meter not there", taosIpStr(pObj->privateIp), vnode, sid);
*pMsg = TSDB_CODE_INVALID_METER_ID;
*pMsg = TSDB_CODE_INVALID_TABLE_ID;
pMsg++;
*(int32_t *)pMsg = htonl(vnode);
......@@ -88,7 +88,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
}
msgLen = pMsg - pStart;
taosSendMsgToDnode(pObj, pStart, msgLen);
mgmtSendMsgToDnode(pObj, pStart, msgLen);
return 0;
}
......@@ -100,7 +100,7 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
SVgObj * pVgroup = NULL;
if (!sdbMaster) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_REDIRECT);
mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_REDIRECT);
return 0;
}
......@@ -108,7 +108,7 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
pStart = taosBuildRspMsgToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP);
if (pStart == NULL) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0;
}
pMsg = pStart;
......@@ -129,7 +129,7 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
}
msgLen = pMsg - pStart;
taosSendMsgToDnode(pObj, pStart, msgLen);
mgmtSendMsgToDnode(pObj, pStart, msgLen);
return 0;
}
......@@ -142,7 +142,7 @@ int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) {
STaosRsp *pRsp = (STaosRsp *)msg;
if (!sdbMaster) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_DNODE_VPEERS_RSP, TSDB_CODE_REDIRECT);
mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_DNODE_VPEERS_RSP, TSDB_CODE_REDIRECT);
return 0;
}
......@@ -251,7 +251,7 @@ int32_t mgmtSendCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, int
int8_t *pMsg = mgmtBuildCreateChildTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode, tagDataLen, pTagData);
int32_t msgLen = pMsg - pStart;
taosSendMsgToDnode(pObj, pStart, msgLen);
mgmtSendMsgToDnode(pObj, pStart, msgLen);
}
pVgroup->lastCreate = timeStamp;
......@@ -275,7 +275,7 @@ int32_t mgmtSendCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup) {
int8_t *pMsg = mgmtBuildCreateStreamTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode);
int32_t msgLen = pMsg - pStart;
taosSendMsgToDnode(pObj, pStart, msgLen);
mgmtSendMsgToDnode(pObj, pStart, msgLen);
}
pVgroup->lastCreate = timeStamp;
......@@ -299,7 +299,7 @@ int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) {
int8_t *pMsg = mgmtBuildCreateNormalTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode);
int32_t msgLen = pMsg - pStart;
taosSendMsgToDnode(pObj, pStart, msgLen);
mgmtSendMsgToDnode(pObj, pStart, msgLen);
}
pVgroup->lastCreate = timeStamp;
......@@ -307,7 +307,7 @@ int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) {
}
int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) {
SRemoveMeterMsg *pRemove;
SDRemoveTableMsg *pRemove;
char * pMsg, *pStart;
int i, msgLen = 0;
SDnodeObj * pObj;
......@@ -326,15 +326,15 @@ int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) {
if (pStart == NULL) continue;
pMsg = pStart;
pRemove = (SRemoveMeterMsg *)pMsg;
pRemove = (SDRemoveTableMsg *)pMsg;
pRemove->vnode = htons(pVgroup->vnodeGid[i].vnode);
pRemove->sid = htonl(pTable->gid.sid);
memcpy(pRemove->meterId, pTable->meterId, TSDB_TABLE_ID_LEN);
pMsg += sizeof(SRemoveMeterMsg);
pMsg += sizeof(SDRemoveTableMsg);
msgLen = pMsg - pStart;
taosSendMsgToDnode(pObj, pStart, msgLen);
mgmtSendMsgToDnode(pObj, pStart, msgLen);
tinet_ntoa(ipstr, pVgroup->vnodeGid[i].ip);
mTrace("dnode:%s vid:%d, send remove meter msg, sid:%d status:%d", ipstr, pVgroup->vnodeGid[i].vnode,
......@@ -371,7 +371,7 @@ int mgmtSendAlterStreamMsgToDnode(STabObj *pTable, SVgObj *pVgroup) {
pMsg += sizeof(SAlterStreamMsg);
msgLen = pMsg - pStart;
taosSendMsgToDnode(pObj, pStart, msgLen);
mgmtSendMsgToDnode(pObj, pStart, msgLen);
}
return 0;
......@@ -433,7 +433,7 @@ int mgmtSendVPeersMsg(SVgObj *pVgroup) {
pMsg = mgmtBuildVpeersIe(pStart, pVgroup, pVgroup->vnodeGid[i].vnode);
msgLen = pMsg - pStart;
taosSendMsgToDnode(pDnode, pStart, msgLen);
mgmtSendMsgToDnode(pDnode, pStart, msgLen);
}
}
......@@ -467,7 +467,7 @@ int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid) {
pMsg += sizeof(SFreeVnodeMsg);
msgLen = pMsg - pStart;
taosSendMsgToDnode(pDnode, pStart, msgLen);
mgmtSendMsgToDnode(pDnode, pStart, msgLen);
return 0;
}
......@@ -547,7 +547,7 @@ int mgmtSendCfgDnodeMsg(char *cont) {
pMsg += sizeof(SCfgMsg);
msgLen = pMsg - pStart;
taosSendMsgToDnode(pDnode, pStart, msgLen);
mgmtSendMsgToDnode(pDnode, pStart, msgLen);
#else
(void)tsCfgDynamicOptions(pCfg->config);
#endif
......@@ -559,61 +559,46 @@ int mgmtSendCfgDnodeMsg(char *cont) {
* functions for communicate between dnode and mnode
*/
extern void *dmQhandle;
extern void *tsDnodeMgmtQhandle;
void * mgmtStatusTimer = NULL;
void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *pObj);
char* taosBuildRspMsgToDnodeWithSizeImp(SDnodeObj *pObj, char type, int32_t size) {
char *pStart = (char *)malloc(size);
if (pStart == NULL) {
return NULL;
}
void mgmtSendMsgToDnodeImpFp(SSchedMsg *sched) {
int8_t msgType = *(int8_t *) (sched->msg - sizeof(int32_t) - sizeof(int8_t));
int32_t contLen = *(int32_t *) (sched->msg - sizeof(int8_t));
int8_t *pCont = sched->msg;
void *pConn = NULL;
*pStart = type;
return pStart + 1;
dnodeProcessMsgFromMgmt(pCont, contLen, msgType, pConn);
rpcFreeCont(sched->msg);
}
char* (*taosBuildRspMsgToDnodeWithSize)(SDnodeObj *pObj, char type, int32_t size) = taosBuildRspMsgToDnodeWithSizeImp;
char* taosBuildReqMsgToDnodeWithSizeImp(SDnodeObj *pObj, char type, int32_t size) {
char *pStart = (char *)malloc(size);
if (pStart == NULL) {
return NULL;
}
int32_t mgmtSendMsgToDnodeImp(int8_t *pCont, int32_t contLen, int8_t msgType) {
mTrace("msg:%s is sent to dnode", taosMsg[msgType]);
*(int8_t *) (pCont - sizeof(int32_t) - sizeof(int8_t)) = msgType;
*(int32_t *) (pCont - sizeof(int8_t)) = contLen;
*pStart = type;
return pStart + 1;
}
char* (*taosBuildReqMsgToDnodeWithSize)(SDnodeObj *pObj, char type, int32_t size) = taosBuildReqMsgToDnodeWithSizeImp;
SSchedMsg schedMsg = {0};
schedMsg.fp = mgmtSendMsgToDnodeImpFp;
schedMsg.msg = pCont;
char *taosBuildRspMsgToDnode(SDnodeObj *pObj, char type) {
return taosBuildRspMsgToDnodeWithSize(pObj, type, 256);
}
taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
char *taosBuildReqMsgToDnode(SDnodeObj *pObj, char type) {
return taosBuildReqMsgToDnodeWithSize(pObj, type, 256);
return TSDB_CODE_SUCCESS;
}
int32_t taosSendSimpleRspToDnodeImp(SDnodeObj *pObj, char rsptype, char code) { return 0; }
int32_t (*taosSendSimpleRspToDnode)(SDnodeObj *pObj, char rsptype, char code) = taosSendSimpleRspToDnodeImp;
int32_t (*mgmtSendMsgToDnode)(SDnodeObj *pObj, char *msg, int msgLen) = mgmtSendMsgToDnodeImp;
int32_t taosSendMsgToDnodeImp(SDnodeObj *pObj, char *msg, int32_t msgLen) {
mTrace("msg:%s is sent to dnode", taosMsg[(uint8_t)(*(msg-1))]);
int32_t mgmtSendSimpleRspToDnodeImp(int32_t msgType, int32_t code) {
int8_t *pCont = rpcMallocCont(sizeof(int32_t));
*(int32_t *) pCont = code;
/*
* Lite version has no message header, so minus one
*/
SSchedMsg schedMsg;
schedMsg.fp = dnodeProcessMsgFromMgmtImp;
schedMsg.msg = msg - 1;
schedMsg.ahandle = NULL;
schedMsg.thandle = NULL;
taosScheduleTask(dmQhandle, &schedMsg);
return 0;
mgmtSendMsgToDnodeImp(pCont, sizeof(int32_t), msgType);
return TSDB_CODE_SUCCESS;
}
int32_t (*taosSendMsgToDnode)(SDnodeObj *pObj, char *msg, int msgLen) = taosSendMsgToDnodeImp;
int32_t (*mgmtSendSimpleRspToDnode)(int32_t msgType, int32_t code) = mgmtSendSimpleRspToDnodeImp;
int32_t mgmtInitDnodeIntImp() { return 0; }
int32_t (*mgmtInitDnodeInt)() = mgmtInitDnodeIntImp;
......@@ -682,13 +667,3 @@ void mgmtProcessDnodeStatusImp(void *handle, void *tmrId) {
*/
}
void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId) = mgmtProcessDnodeStatusImp;
void mgmtProcessMsgFromDnodeSpecImp(SSchedMsg *sched) {
char msgType = *sched->msg;
char *content = sched->msg + 1;
mTrace("msg:%s is received from dnode", taosMsg[(uint8_t)msgType]);
mgmtProcessMsgFromDnode(content, 0, msgType, mgmtGetDnode(0));
free(sched->msg);
}
void (*mgmtProcessMsgFromDnodeSpec)(SSchedMsg *sched) = mgmtProcessMsgFromDnodeSpecImp;
......@@ -162,7 +162,10 @@ int mgmtStartSystem() {
return 0;
}
int32_t mgmtInitSystemImp() { return mgmtStartSystem(); }
int32_t mgmtInitSystemImp() {
return mgmtStartSystem();
}
int32_t (*mgmtInitSystem)() = mgmtInitSystemImp;
int32_t mgmtCheckMgmtRunningImp() { return 0; }
......@@ -177,6 +180,7 @@ void mgmtStartMgmtTimerImp() {
void (*mgmtStartMgmtTimer)() = mgmtStartMgmtTimerImp;
void mgmtStopSystemImp() {}
void (*mgmtStopSystem)() = mgmtStopSystemImp;
void mgmtCleanUpRedirectImp() {}
......
......@@ -296,11 +296,11 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
if (pShow->payloadLen > 0 ) {
pTable = mgmtGetTable(pShow->payload);
if (NULL == pTable) {
return TSDB_CODE_INVALID_METER_ID;
return TSDB_CODE_INVALID_TABLE_ID;
}
pVgroup = mgmtGetVgroup(pTable->gid.vgId);
if (NULL == pVgroup) return TSDB_CODE_INVALID_METER_ID;
if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID;
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
} else {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __MONITOR_H__
#define __MONITOR_H__
#include "tglobalcfg.h"
#include "tlog.h"
#define monitorError(...) \
if (monitorDebugFlag & DEBUG_ERROR) { \
tprintf("ERROR MON ", 255, __VA_ARGS__); \
}
#define monitorWarn(...) \
if (monitorDebugFlag & DEBUG_WARN) { \
tprintf("WARN MON ", monitorDebugFlag, __VA_ARGS__); \
}
#define monitorTrace(...) \
if (monitorDebugFlag & DEBUG_TRACE) { \
tprintf("MON ", monitorDebugFlag, __VA_ARGS__); \
}
#define monitorPrint(...) \
{ tprintf("MON ", 255, __VA_ARGS__); }
#define monitorLError(...) taosLogError(__VA_ARGS__) monitorError(__VA_ARGS__)
#define monitorLWarn(...) taosLogWarn(__VA_ARGS__) monitorWarn(__VA_ARGS__)
#define monitorLPrint(...) taosLogPrint(__VA_ARGS__) monitorPrint(__VA_ARGS__)
#endif
\ No newline at end of file
......@@ -16,9 +16,19 @@
#ifndef TDENGINE_HTTP_SYSTEM_H
#define TDENGINE_HTTP_SYSTEM_H
int httpInitSystem();
int httpStartSystem();
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
int32_t httpInitSystem();
int32_t httpStartSystem();
void httpStopSystem();
void httpCleanUpSystem();
#ifdef __cplusplus
}
#endif
#endif
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_UTIL_H
#define TDENGINE_DNODE_UTIL_H
#ifndef TDENGINE_MONITOR_SYSTEM_H
#define TDENGINE_MONITOR_SYSTEM_H
#ifdef __cplusplus
extern "C" {
......@@ -22,18 +22,14 @@ extern "C" {
#include <stdbool.h>
#include <stdint.h>
#include "taosdef.h"
#include "taosmsg.h"
#include "tstatus.h"
EVnodeStatus dnodeGetVnodeStatus(int32_t vnode);
bool dnodeCheckVnodeExist(int32_t vnode);
void *dnodeGetVnodeObj(int32_t vnode);
int32_t monitorInitSystem();
int32_t monitorStartSystem();
void monitorStopSystem();
void monitorCleanUpSystem();
#ifdef __cplusplus
}
#endif
#endif
#endif
\ No newline at end of file
......@@ -23,3 +23,6 @@ ENDIF ()
ADD_LIBRARY(trpc ${SRC})
TARGET_LINK_LIBRARIES(trpc tutil)
ADD_SUBDIRECTORY(test)
......@@ -16,10 +16,18 @@
#ifndef _rpc_hash_ip_header_
#define _rpc_hash_ip_header_
#ifdef __cplusplus
extern "C" {
#endif
void *taosOpenIpHash(int maxSessions);
void taosCloseIpHash(void *handle);
void *taosAddIpHash(void *handle, void *pData, uint32_t ip, uint16_t port);
void taosDeleteIpHash(void *handle, uint32_t ip, uint16_t port);
void *taosGetIpHash(void *handle, uint32_t ip, uint16_t port);
#ifdef __cplusplus
}
#endif
#endif
......@@ -16,6 +16,10 @@
#ifndef _taos_tcp_client_header_
#define _taos_tcp_client_header_
#ifdef __cplusplus
extern "C" {
#endif
#include "taosdef.h"
void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle);
......@@ -24,4 +28,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16
void taosCloseTcpClientConnection(void *chandle);
int taosSendTcpClientData(uint32_t ip, uint16_t port, char *data, int len, void *chandle);
#ifdef __cplusplus
}
#endif
#endif
......@@ -16,6 +16,10 @@
#ifndef _taos_tcp_server_header_
#define _taos_tcp_server_header_
#ifdef __cplusplus
extern "C" {
#endif
#include "taosdef.h"
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
......@@ -23,4 +27,8 @@ void taosCleanUpTcpServer(void *param);
void taosCloseTcpServerConnection(void *param);
int taosSendTcpServerData(uint32_t ip, uint16_t port, char *data, int len, void *chandle);
#ifdef __cplusplus
}
#endif
#endif
......@@ -16,6 +16,10 @@
#ifndef _taos_udp_header_
#define _taos_udp_header_
#ifdef __cplusplus
extern "C" {
#endif
#include "taosdef.h"
void *taosInitUdpServer(char *ip, uint16_t port, char *label, int, void *fp, void *shandle);
......@@ -30,4 +34,8 @@ void taosSendMsgHdr(void *hdr, int fd);
void taosInitMsgHdr(void **hdr, void *dest, int maxPkts);
void taosSetMsgHdrData(void *hdr, char *data, int dataLen);
#ifdef __cplusplus
}
#endif
#endif
......@@ -30,6 +30,7 @@
#include "lz4.h"
#include "tconncache.h"
#include "trpc.h"
#include "taoserror.h"
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHeader) + sizeof(SRpcDigest))
#define rpcHeaderFromCont(cont) ((SRpcHeader *) (cont - sizeof(SRpcHeader)))
......
......@@ -26,30 +26,8 @@ char *taosMsg[] = {
"create-table",
"create-table-rsp", //10
"create-normal-table",
"create-normal-table-rsp",
"create-stream-table",
"create-stream-table-rsp",
"create-super-table",
"create-super-table-rsp",
"remove-table",
"remove-table-rsp",
"remove-normal-table",
"remove-normal-table-rsp", //20
"remove-stream-table",
"remove-stream-table-rsp",
"remove-super-table",
"remove-super-table-rsp",
"alter-table",
"alter-table-rsp",
"alter-normal-table",
"alter-normal-table-rsp",
"alter-stream-table",
"alter-stream-table-rsp", //30
"alter-super-table",
"alter-super-table-rsp",
"vpeers",
"vpeers-rsp",
"free-vnode",
......@@ -57,7 +35,7 @@ char *taosMsg[] = {
"cfg-dnode",
"cfg-dnode-rsp",
"alter-stream",
"alter-stream-rsp", //40
"alter-stream-rsp", //20
"sync",
"sync-rsp",
......@@ -68,7 +46,7 @@ char *taosMsg[] = {
"",
"",
"",
"", //50
"", //30
"connect",
"connect-rsp",
......@@ -79,7 +57,7 @@ char *taosMsg[] = {
"drop-acct",
"drop-acct-rsp",
"create-user",
"create-user-rsp", //60
"create-user-rsp", //40
"alter-user",
"alter-user-rsp",
......@@ -90,7 +68,7 @@ char *taosMsg[] = {
"drop-mnode",
"drop-mnode-rsp",
"create-dnode",
"create-dnode-rsp", //70
"create-dnode-rsp", //50
"drop-dnode",
"drop-dnode-rsp",
......@@ -101,7 +79,7 @@ char *taosMsg[] = {
"drop-db",
"drop-db-rsp",
"use-db",
"use-db-rsp", //80
"use-db-rsp", //60
"alter-db",
"alter-db-rsp",
......@@ -112,7 +90,7 @@ char *taosMsg[] = {
"alter-table",
"alter-table-rsp",
"cfg-vnode",
"cfg-vnode-rsp", //90
"cfg-vnode-rsp", //70
"cfg-table",
"cfg-table-rsp",
......@@ -123,7 +101,7 @@ char *taosMsg[] = {
"multi-table-meta",
"multi-table-meta-rsp",
"alter-stream",
"alter-stream-rsp", //100
"alter-stream-rsp", //80
"show",
"show-rsp",
......@@ -134,7 +112,7 @@ char *taosMsg[] = {
"kill-stream",
"kill-stream-rsp",
"kill-connection",
"kill-connectoin-rsp", //110
"kill-connectoin-rsp", //90
"heart-beat",
"heart-beat-rsp",
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(../inc)
AUX_SOURCE_DIRECTORY(./ TEST_SRC)
ADD_EXECUTABLE(rpcTest ${TEST_SRC})
TARGET_LINK_LIBRARIES(rpcTest trpc)
ENDIF ()
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#define _DEFAULT_SOURCE
#include "os.h"
#include "tlog.h"
#include "trpc.h"
#include <stdint.h>
int32_t main(int32_t argc, char *argv[]) {
dPrint("unit test for rpc module");
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = "0.0.0.0";
rpcInit.localPort = 7000;
rpcInit.label = "unittest";
rpcInit.numOfThreads = 1;
rpcInit.fp = NULL;
rpcInit.sessions = 1000;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S();
rpcInit.idleTime = 2000;
void *pConn = rpcOpen(&rpcInit);
if (pConn != NULL) {
dPrint("conection is opened");
} else {
dError("failed to initialize rpc");
}
return 0;
}
......@@ -197,6 +197,48 @@ extern uint32_t cdebugFlag;
#define mLWarn(...) taosLogWarn(__VA_ARGS__) mWarn(__VA_ARGS__)
#define mLPrint(...) taosLogPrint(__VA_ARGS__) mPrint(__VA_ARGS__)
#define httpError(...) \
if (httpDebugFlag & DEBUG_ERROR) { \
tprintf("ERROR HTP ", 255, __VA_ARGS__); \
}
#define httpWarn(...) \
if (httpDebugFlag & DEBUG_WARN) { \
tprintf("WARN HTP ", httpDebugFlag, __VA_ARGS__); \
}
#define httpTrace(...) \
if (httpDebugFlag & DEBUG_TRACE) { \
tprintf("HTP ", httpDebugFlag, __VA_ARGS__); \
}
#define httpDump(...) \
if (httpDebugFlag & DEBUG_TRACE) { \
taosPrintLongString("HTP ", httpDebugFlag, __VA_ARGS__); \
}
#define httpPrint(...) \
{ tprintf("HTP ", 255, __VA_ARGS__); }
#define httpLError(...) taosLogError(__VA_ARGS__) httpError(__VA_ARGS__)
#define httpLWarn(...) taosLogWarn(__VA_ARGS__) httpWarn(__VA_ARGS__)
#define httpLPrint(...) taosLogPrint(__VA_ARGS__) httpPrint(__VA_ARGS__)
#define monitorError(...) \
if (monitorDebugFlag & DEBUG_ERROR) { \
tprintf("ERROR MON ", 255, __VA_ARGS__); \
}
#define monitorWarn(...) \
if (monitorDebugFlag & DEBUG_WARN) { \
tprintf("WARN MON ", monitorDebugFlag, __VA_ARGS__); \
}
#define monitorTrace(...) \
if (monitorDebugFlag & DEBUG_TRACE) { \
tprintf("MON ", monitorDebugFlag, __VA_ARGS__); \
}
#define monitorPrint(...) \
{ tprintf("MON ", 255, __VA_ARGS__); }
#define monitorLError(...) taosLogError(__VA_ARGS__) monitorError(__VA_ARGS__)
#define monitorLWarn(...) taosLogWarn(__VA_ARGS__) monitorWarn(__VA_ARGS__)
#define monitorLPrint(...) taosLogPrint(__VA_ARGS__) monitorPrint(__VA_ARGS__)
#ifdef __cplusplus
}
#endif
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册