diff --git a/documentation/webdocs/markdowndocs/Super Table-ch.md b/documentation/webdocs/markdowndocs/Super Table-ch.md index 524fb51b17566955670966e2c51e7cb38e5df67f..38e6f8c17f900ad9597366f5131b3f1eebeb8863 100644 --- a/documentation/webdocs/markdowndocs/Super Table-ch.md +++ b/documentation/webdocs/markdowndocs/Super Table-ch.md @@ -1,6 +1,6 @@ # 超级表STable:多表聚合 -TDengine要求每个数据采集点单独建表,这样能极大提高数据的插入/查询性能,但是导致系统中表的数量猛增,让应用对表的维护以及聚合、统计操作难度加大。为降低应用的开发难度,TDengine引入了超级表STable (Super Table)的概念。 +TDengine要求每个数据采集点单独建表。独立建表的模式能够避免写入过程中的同步加锁,因此能够极大地提升数据的插入/查询性能。但是独立建表意味着系统中表的数量与采集点的数量在同一个量级。如果采集点众多,将导致系统中表的数量也非常庞大,让应用对表的维护以及聚合、统计操作难度加大。为降低应用的开发难度,TDengine引入了超级表(Super Table, 简称为STable)的概念。 ## 什么是超级表 @@ -9,14 +9,14 @@ STable是同一类型数据采集点的抽象,是同类型采集实例的集 TDengine扩展标准SQL语法用于定义STable,使用关键词tags指定标签信息。语法如下: ```mysql -CREATE TABLE ( TIMESTAMP, field_name1 field_type,…) TAGS(tag_name tag_type, …) +CREATE TABLE ( TIMESTAMP, field_name1 field_type,…) TAGS(tag_name tag_type, …) ``` -其中tag_name是标签名,tag_type是标签的数据类型。标签可以使用时间戳之外的其他TDengine支持的数据类型,标签的个数最多为6个,名字不能与系统关键词相同,也不能与其他列名相同。如: +其中tag_name是标签名,tag_type是标签的数据类型。标签可以使用时间戳之外的其他TDengine支持的数据类型,标签的个数最多为32个,名字不能与系统关键词相同,也不能与其他列名相同。如: ```mysql -create table thermometer (ts timestamp, degree float) -tags (location binary(20), type int) +CREATE TABLE thermometer (ts timestamp, degree float) +TAGS (location binary(20), type int) ``` 上述SQL创建了一个名为thermometer的STable,带有标签location和标签type。 @@ -30,7 +30,7 @@ CREATE TABLE USING TAGS (tag_value1,...) 沿用上面温度计的例子,使用超级表thermometer建立单个温度计数据表的语句如下: ```mysql -create table t1 using thermometer tags ('beijing', 10) +CREATE TABLE t1 USING thermometer TAGS ('beijing', 10) ``` 上述SQL以thermometer为模板,创建了名为t1的表,这张表的Schema就是thermometer的Schema,但标签location值为'beijing',标签type值为10。 diff --git a/documentation/webdocs/markdowndocs/advanced features-ch.md b/documentation/webdocs/markdowndocs/advanced features-ch.md index fc229e71e6c76325938f794ac9fccb542778d2ae..9dc289a8d5765d7ffc5dc9bab5267b61559f0d02 100644 --- a/documentation/webdocs/markdowndocs/advanced features-ch.md +++ b/documentation/webdocs/markdowndocs/advanced features-ch.md @@ -67,7 +67,7 @@ TDengine内嵌支持轻量级的消息订阅与推送服务。使用系统提供 TDengine的订阅与推送服务的状态是客户端维持,TDengine服务器并不维持。因此如果应用重启,从哪个时间点开始获取最新数据,由应用决定。 -订阅相关API请见 [连接器](https://www.taosdata.com/cn/documentation/connector/)。 +订阅相关API文档请见 [C/C++ 数据订阅接口](https://www.taosdata.com/cn/documentation/connector/#C/C++-%E6%95%B0%E6%8D%AE%E8%AE%A2%E9%98%85%E6%8E%A5%E5%8F%A3),《[TDEngine中订阅的用途和用法](https://www.taosdata.com/blog/2020/02/12/1277.html)》则以一个示例详细介绍了这些API的用法。 ## 缓存 (Cache) TDengine采用时间驱动缓存管理策略(First-In-First-Out,FIFO),又称为写驱动的缓存管理机制。这种策略有别于读驱动的数据缓存模式(Least-Recent-Use,LRU),直接将最近写入的数据保存在系统的缓存中。当缓存达到临界值的时候,将最早的数据批量写入磁盘。一般意义上来说,对于物联网数据的使用,用户最为关心最近产生的数据,即当前状态。TDengine充分利用了这一特性,将最近到达的(当前状态)数据保存在缓存中。 diff --git a/documentation/webdocs/markdowndocs/advanced features.md b/documentation/webdocs/markdowndocs/advanced features.md index e841a5a6a531c94908b0f027d2d3e808d40ecac5..3eae454da59eec4aa7ea73741dcde2e882cb0821 100644 --- a/documentation/webdocs/markdowndocs/advanced features.md +++ b/documentation/webdocs/markdowndocs/advanced features.md @@ -62,7 +62,7 @@ Time series data is a sequence of data points over time. Inside a table, the dat To reduce the development complexity and improve data consistency, TDengine provides the pub/sub functionality. To publish a message, you simply insert a record into a table. Compared with popular messaging tool Kafka, you subscribe to a table or a SQL query statement, instead of a topic. Once new data points arrive, TDengine will notify the application. The process is just like Kafka. -The detailed API will be introduced in the [connectors](https://www.taosdata.com/en/documentation/connector/) section. +The API documentation is at [C/C++ subscription API](https://www.taosdata.com/en/documentation/connector/#C/C++-subscription-API) section, and you can find more information from blog article (only Chinese version at present) [The usage of subscription](https://www.taosdata.com/blog/2020/02/12/1277.html). ##Caching TDengine allocates a fixed-size buffer in memory, the newly arrived data will be written into the buffer first. Every device or table gets one or more memory blocks. For typical IoT scenarios, the hot data shall always be newly arrived data, they are more important for timely analysis. Based on this observation, TDengine manages the cache blocks in First-In-First-Out strategy. If no enough space in the buffer, the oldest data will be saved into hard disk first, then be overwritten by newly arrived data. TDengine also guarantees every device can keep at least one block of data in the buffer. diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 712ccbffbfba1a23b347a38008b1b381dc248698..8d4951b89131bdbc40fd3276010dd0103672e57f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,11 +4,11 @@ PROJECT(TDengine) ADD_SUBDIRECTORY(os) ADD_SUBDIRECTORY(util) ADD_SUBDIRECTORY(rpc) -ADD_SUBDIRECTORY(client) -ADD_SUBDIRECTORY(kit) -ADD_SUBDIRECTORY(modules) -ADD_SUBDIRECTORY(sdb) -ADD_SUBDIRECTORY(mnode) -ADD_SUBDIRECTORY(dnode) -ADD_SUBDIRECTORY(vnode) -ADD_SUBDIRECTORY(connector/jdbc) +#ADD_SUBDIRECTORY(client) +#ADD_SUBDIRECTORY(kit) +#ADD_SUBDIRECTORY(plugins) +#ADD_SUBDIRECTORY(sdb) +#ADD_SUBDIRECTORY(mnode) +#ADD_SUBDIRECTORY(dnode) +#ADD_SUBDIRECTORY(vnode) +#ADD_SUBDIRECTORY(connector/jdbc) diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index 92d6b61eb2473c790c967a4a0091e233de84b8fa..55d6cb251caa839e85b023ea24193d1074262ba4 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -4,8 +4,10 @@ PROJECT(TDengine) INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(jni) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) +INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) -AUX_SOURCE_DIRECTORY(./src SRC) +AUX_SOURCE_DIRECTORY(src SRC) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/linux) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 3720a09459af4d0275a5c409c5eb5057dcfe3c81..ec839e25758793f7ada6a12684274903f6bfdfa0 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -30,6 +30,7 @@ extern "C" { #include "taosdef.h" #include "tsqlfunction.h" #include "tutil.h" +#include "trpc.h" #define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \ (res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows) @@ -324,6 +325,7 @@ typedef struct _sql_obj { int64_t stime; uint32_t queryId; void * thandle; + SRpcIpSet ipSet; void * pStream; void * pSubscription; char * sqlstr; @@ -371,12 +373,6 @@ typedef struct _sstream { struct _sstream *prev, *next; } SSqlStream; -typedef struct { - char numOfIps; - uint32_t ip[TSDB_MAX_MGMT_IPS]; - char ipstr[TSDB_MAX_MGMT_IPS][TSDB_IPv4ADDR_LEN]; -} SIpStrList; - // tscSql API int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); @@ -461,7 +457,7 @@ extern void * tscQhandle; extern int tscKeepConn[]; extern int tsInsertHeadSize; extern int tscNumOfThreads; -extern SIpStrList tscMgmtIpList; +extern SRpcIpSet tscMgmtIpList; typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 94ebaefd369975c6873e6dce7ff36c3b513ad91e..a70a314298ca870e927ae1f7ff3629d0b925747d 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -25,6 +25,7 @@ #include "tscSQLParser.h" #include "tutil.h" #include "tnote.h" +#include "tsched.h" static void tscProcessFetchRow(SSchedMsg *pMsg); static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows); diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index a7a774b3a8ce71a608d15ec9a71f931a7a59a06a..c6abfabf93370bac4995b6d8f6384b4fc98a6273 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -279,7 +279,7 @@ void tscKillConnection(STscObj *pObj) { SSqlObj *pSql = pObj->sqlList; while (pSql) { - taosStopRpcConn(pSql->thandle); + //taosStopRpcConn(pSql->thandle); pSql = pSql->next; } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 60c597b38528535a13d6c30142bc212affceb50c..e5499fd04f68cec22cc548db27cdb27c4da61717 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -900,7 +900,7 @@ bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField) { return false; } - if (pTagField->type < TSDB_DATA_TYPE_BOOL && pTagField->type > TSDB_DATA_TYPE_NCHAR) { + if ((pTagField->type < TSDB_DATA_TYPE_BOOL) || (pTagField->type > TSDB_DATA_TYPE_NCHAR)) { invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); return false; } @@ -1655,7 +1655,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt SSchema* pSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta); // functions can not be applied to tags - if (index.columnIndex >= pMeterMetaInfo->pMeterMeta->numOfColumns) { + if ((index.columnIndex >= pMeterMetaInfo->pMeterMeta->numOfColumns) || (index.columnIndex < 0)) { return invalidSqlErrMsg(pQueryInfo->msg, msg6); } @@ -5663,4 +5663,4 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { bool hasDefaultQueryTimeRange(SQueryInfo *pQueryInfo) { return (pQueryInfo->stime == 0 && pQueryInfo->etime == INT64_MAX) || (pQueryInfo->stime == INT64_MAX && pQueryInfo->etime == 0); -} \ No newline at end of file +} diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 1fe4ba2979a507b7044c581450db1cb0fbdb01ed..fe385c30770742a45be4746b73b5c7f43c8aeea4 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -31,10 +31,14 @@ #define TSC_MGMT_VNODE 999 -SIpStrList tscMgmtIpList; +SRpcIpSet tscMgmtIpList; int tsMasterIndex = 0; int tsSlaveIndex = 1; +//temp +SRpcIpSet tscMgmtIpSet; +SRpcIpSet tscDnodeIpSet; + int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql); @@ -53,7 +57,7 @@ void tscPrintMgmtIp() { tscError("invalid mgmt IP list:%d", tscMgmtIpList.numOfIps); } else { for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) { - tscTrace("mgmt index:%d ip:%s", i, tscMgmtIpList.ipstr[i]); + tscTrace("mgmt index:%d ip:%s", i, tscMgmtIpList.ipStr[i]); } } } @@ -62,7 +66,7 @@ void tscSetMgmtIpListFromCluster(SIpList *pIpList) { tscMgmtIpList.numOfIps = pIpList->numOfIps; if (memcmp(tscMgmtIpList.ip, pIpList->ip, pIpList->numOfIps * 4) != 0) { for (int i = 0; i < pIpList->numOfIps; ++i) { - tinet_ntoa(tscMgmtIpList.ipstr[i], pIpList->ip[i]); + tinet_ntoa(tscMgmtIpList.ipStr[i], pIpList->ip[i]); tscMgmtIpList.ip[i] = pIpList->ip[i]; } tscTrace("cluster mgmt IP list:"); @@ -73,9 +77,9 @@ void tscSetMgmtIpListFromCluster(SIpList *pIpList) { void tscSetMgmtIpListFromEdge() { if (tscMgmtIpList.numOfIps != 2) { tscMgmtIpList.numOfIps = 2; - strcpy(tscMgmtIpList.ipstr[0], tsMasterIp); + strcpy(tscMgmtIpList.ipStr[0], tsMasterIp); tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); - strcpy(tscMgmtIpList.ipstr[1], tsMasterIp); + strcpy(tscMgmtIpList.ipStr[1], tsMasterIp); tscMgmtIpList.ip[1] = inet_addr(tsMasterIp); tscTrace("edge mgmt IP list:"); tscPrintMgmtIp(); @@ -168,7 +172,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { if (tscShouldFreeHeatBeat(pObj->pHb)) { tscTrace("%p free HB object and release connection, pConn:%p", pObj, pObj->pHb->thandle); - taosCloseRpcConn(pObj->pHb->thandle); + //taosCloseRpcConn(pObj->pHb->thandle); tscFreeSqlObj(pObj->pHb); tscCloseTscObj(pObj); @@ -178,6 +182,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { tscProcessSql(pObj->pHb); } + void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) { STscObj *pTscObj = pSql->pTscObj; if (pSql->retry < tscGetMgmtConnMaxRetryTimes()) { @@ -187,23 +192,24 @@ void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) { if (pSql->cmd.command > TSDB_SQL_READ && pSql->index == 0) pSql->index = 1; void *thandle = taosGetConnFromCache(tscConnCache, tscMgmtIpList.ip[pSql->index], TSC_MGMT_VNODE, pTscObj->user); - if (thandle == NULL) { - SRpcConnInit connInit; - memset(&connInit, 0, sizeof(connInit)); - connInit.cid = 0; - connInit.sid = 0; - connInit.meterId = pSql->pTscObj->user; - connInit.peerId = 0; - connInit.shandle = pTscMgmtConn; - connInit.ahandle = pSql; - connInit.peerPort = tsMgmtShellPort; - connInit.spi = 1; - connInit.encrypt = 0; - connInit.secret = pSql->pTscObj->pass; - - connInit.peerIp = tscMgmtIpList.ipstr[pSql->index]; - thandle = taosOpenRpcConn(&connInit, pCode); - } + +// if (thandle == NULL) { +// SRpcConnInit connInit; +// memset(&connInit, 0, sizeof(connInit)); +// connInit.cid = 0; +// connInit.sid = 0; +// connInit.meterId = pSql->pTscObj->user; +// connInit.peerId = 0; +// connInit.shandle = pTscMgmtConn; +// connInit.ahandle = pSql; +// connInit.peerPort = tsMgmtShellPort; +// connInit.spi = 1; +// connInit.encrypt = 0; +// connInit.secret = pSql->pTscObj->pass; +// +// connInit.peerIp = tscMgmtIpList.ipstr[pSql->index]; +// thandle = taosOpenRpcConn(&connInit, pCode); +// } pSql->thandle = thandle; pSql->ip = tscMgmtIpList.ip[pSql->index]; @@ -267,23 +273,23 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) { void *thandle = taosGetConnFromCache(tscConnCache, pVPeersDesc[pSql->index].ip, pVPeersDesc[pSql->index].vnode, pTscObj->user); - if (thandle == NULL) { - SRpcConnInit connInit; - tinet_ntoa(ipstr, pVPeersDesc[pSql->index].ip); - memset(&connInit, 0, sizeof(connInit)); - connInit.cid = vidIndex; - connInit.sid = 0; - connInit.spi = 0; - connInit.encrypt = 0; - connInit.meterId = pSql->pTscObj->user; - connInit.peerId = htonl((pVPeersDesc[pSql->index].vnode << TSDB_SHELL_VNODE_BITS)); - connInit.shandle = pVnodeConn; - connInit.ahandle = pSql; - connInit.peerIp = ipstr; - connInit.peerPort = tsVnodeShellPort; - thandle = taosOpenRpcConn(&connInit, pCode); - vidIndex = (vidIndex + 1) % tscNumOfThreads; - } +// if (thandle == NULL) { +// SRpcConnInit connInit; +// tinet_ntoa(ipstr, pVPeersDesc[pSql->index].ip); +// memset(&connInit, 0, sizeof(connInit)); +// connInit.cid = vidIndex; +// connInit.sid = 0; +// connInit.spi = 0; +// connInit.encrypt = 0; +// connInit.meterId = pSql->pTscObj->user; +// connInit.peerId = htonl((pVPeersDesc[pSql->index].vnode << TSDB_SHELL_VNODE_BITS)); +// connInit.shandle = pVnodeConn; +// connInit.ahandle = pSql; +// connInit.peerIp = ipstr; +// connInit.peerPort = tsVnodeShellPort; +// thandle = taosOpenRpcConn(&connInit, pCode); +// vidIndex = (vidIndex + 1) % tscNumOfThreads; +// } pSql->thandle = thandle; pSql->ip = pVPeersDesc[pSql->index].ip; @@ -291,6 +297,8 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) { tscTrace("%p vnode:%d ip:%p index:%d is picked up, pConn:%p", pSql, pVPeersDesc[pSql->index].vnode, pVPeersDesc[pSql->index].ip, pSql->index, pSql->thandle); + //TODO fetch from vpeerdesc + pSql->ipSet = tscMgmtIpSet; break; } @@ -326,25 +334,29 @@ int tscSendMsgToServer(SSqlObj *pSql) { size_t totalLen = pSql->cmd.payloadLen + tsRpcHeadSize + sizeof(STaosDigest); // the memory will be released by taosProcessResponse, so no memory leak here - char *buf = malloc(totalLen); - if (NULL == buf) { + char *pStart = rpcMallocCont(pSql->cmd.payloadLen); + if (NULL == pStart) { tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]); return TSDB_CODE_CLI_OUT_OF_MEMORY; } - memcpy(buf, pSql->cmd.payload, totalLen); + memcpy(pStart, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen); tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]); - char *pStart = taosBuildReqHeader(pSql->thandle, pSql->cmd.msgType, buf); if (pStart) { /* * this SQL object may be released by other thread due to the completion of this query even before the log * is dumped to log file. So the signature needs to be kept in a local variable. */ uint64_t signature = (uint64_t)pSql->signature; - if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, buf); + //if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, pStart); + + int ret; + if (pSql->cmd.command < TSDB_SQL_MGMT) + ret = rpcSendRequest(pTscMgmtConn, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql); + else + ret = rpcSendRequest(pVnodeConn, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql); - int ret = taosSendMsgToPeerH(pSql->thandle, pStart, pSql->cmd.payloadLen, pSql); if (ret >= 0) { code = 0; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 85b9462660b98a49eac6d6c4d79ff3f6f0bcc7c9..ddccbccb23a96e675e8c1ddbc7ba4afcee7357f8 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -64,15 +64,15 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const } if (ip && ip[0]) { - tscMgmtIpList.numOfIps = 4; - strcpy(tscMgmtIpList.ipstr[0], ip); + tscMgmtIpList.numOfIps = 3; + strcpy(tscMgmtIpList.ipStr[0], ip); tscMgmtIpList.ip[0] = inet_addr(ip); - strcpy(tscMgmtIpList.ipstr[1], ip); - tscMgmtIpList.ip[1] = inet_addr(ip); - strcpy(tscMgmtIpList.ipstr[2], tsMasterIp); - tscMgmtIpList.ip[2] = inet_addr(tsMasterIp); - strcpy(tscMgmtIpList.ipstr[3], tsSecondIp); - tscMgmtIpList.ip[3] = inet_addr(tsSecondIp); + strcpy(tscMgmtIpList.ipStr[1], tsMasterIp); + tscMgmtIpList.ip[1] = inet_addr(tsMasterIp); + strcpy(tscMgmtIpList.ipStr[2], tsSecondIp); + tscMgmtIpList.ip[2] = inet_addr(tsSecondIp); + tscMgmtIpList.index = 0; + tscMgmtIpList.port = tsMgmtShellPort; } pObj = (STscObj *)malloc(sizeof(STscObj)); @@ -1034,7 +1034,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t pCmd->command = TSDB_SQL_MULTI_META; pCmd->count = 0; - int code = TSDB_CODE_INVALID_METER_ID; + int code = TSDB_CODE_INVALID_TABLE_ID; char *str = (char *)tblNameList; SQueryInfo *pQueryInfo = NULL; @@ -1070,7 +1070,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t // Check if the table name available or not if (tscValidateName(&sToken) != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_INVALID_METER_ID; + code = TSDB_CODE_INVALID_TABLE_ID; sprintf(pCmd->payload, "table name is invalid"); return code; } @@ -1080,7 +1080,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t } if (++pCmd->count > TSDB_MULTI_METERMETA_MAX_NUM) { - code = TSDB_CODE_INVALID_METER_ID; + code = TSDB_CODE_INVALID_TABLE_ID; sprintf(pCmd->payload, "tables over the max number"); return code; } diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 1b5b55352ebca20ec8d4496b76072bba32139568..79b524be0f8fd9b18847e03e9420941c6ec0c05c 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -19,7 +19,7 @@ #include "ttime.h" #include "ttimer.h" #include "tutil.h" - +#include "tsched.h" #include "taosmsg.h" #include "tscUtil.h" #include "tsclient.h" diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index e07f459cf4575a0b3c7ae0b2df6668cb2dd9c108..b9c0ae2018d5a32a0bfaf326b4e946caee0d597c 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -24,8 +24,9 @@ #include "ttime.h" #include "ttimer.h" #include "tutil.h" - +#include "tsched.h" #include "tsclient.h" + // global, not configurable void * pVnodeConn; void * pVMeterConn; @@ -94,18 +95,17 @@ void taos_init_imp() { if (tsTscEnableRecordSql != 0) { taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note"); } - - tscMgmtIpList.numOfIps = 2; - strcpy(tscMgmtIpList.ipstr[0], tsMasterIp); - tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); - strcpy(tscMgmtIpList.ipstr[1], tsMasterIp); - tscMgmtIpList.ip[1] = inet_addr(tsMasterIp); + tscMgmtIpList.index = 0; + tscMgmtIpList.port = tsMgmtShellPort; + tscMgmtIpList.numOfIps = 1; + strcpy(tscMgmtIpList.ipStr[0], tsMasterIp); + tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); if (tsSecondIp[0]) { - tscMgmtIpList.numOfIps = 3; - strcpy(tscMgmtIpList.ipstr[2], tsSecondIp); - tscMgmtIpList.ip[2] = inet_addr(tsSecondIp); + tscMgmtIpList.numOfIps = 2; + strcpy(tscMgmtIpList.ipStr[1], tsSecondIp); + tscMgmtIpList.ip[1] = inet_addr(tsSecondIp); } tscInitMsgs(); @@ -132,42 +132,23 @@ void taos_init_imp() { rpcInit.label = "TSC-vnode"; rpcInit.numOfThreads = tscNumOfThreads; rpcInit.fp = tscProcessMsgFromServer; - rpcInit.bits = 20; - rpcInit.numOfChanns = tscNumOfThreads; - rpcInit.sessionsPerChann = tsMaxVnodeConnections / tscNumOfThreads; - rpcInit.idMgmt = TAOS_ID_FREE; - rpcInit.noFree = 0; + rpcInit.sessions = tsMaxVnodeConnections; rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C(); - rpcInit.qhandle = tscQhandle; - pVnodeConn = taosOpenRpc(&rpcInit); + pVnodeConn = rpcOpen(&rpcInit); if (pVnodeConn == NULL) { tscError("failed to init connection to vnode"); return; } - for (int i = 0; i < tscNumOfThreads; ++i) { - int retVal = taosOpenRpcChann(pVnodeConn, i, rpcInit.sessionsPerChann); - if (0 != retVal) { - tError("TSC-vnode, failed to open rpc chann"); - taosCloseRpc(pVnodeConn); - return; - } - } - memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localIp = tsLocalIp; rpcInit.localPort = 0; rpcInit.label = "TSC-mgmt"; rpcInit.numOfThreads = 1; rpcInit.fp = tscProcessMsgFromServer; - rpcInit.bits = 20; - rpcInit.numOfChanns = 1; - rpcInit.sessionsPerChann = tsMaxMgmtConnections; - rpcInit.idMgmt = TAOS_ID_FREE; - rpcInit.noFree = 0; + rpcInit.sessions = tsMaxMgmtConnections; rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C(); - rpcInit.qhandle = tscQhandle; - pTscMgmtConn = taosOpenRpc(&rpcInit); + pTscMgmtConn = rpcOpen(&rpcInit); if (pTscMgmtConn == NULL) { tscError("failed to init connection to mgmt"); return; @@ -183,7 +164,7 @@ void taos_init_imp() { if (tscCacheHandle == NULL) tscCacheHandle = taosInitDataCache(tsMaxMeterConnections / 2, tscTmr, refreshTime); - tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, taosCloseRpcConn, tscTmr, tsShellActivityTimer * 1000); + tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, NULL/*taosCloseRpcConn*/, tscTmr, tsShellActivityTimer * 1000); initialized = 1; tscTrace("client is initialized successfully"); diff --git a/src/connector/grafana/tdengine/package-lock.json b/src/connector/grafana/tdengine/package-lock.json index 9401fbcd0ae0c67a10ff365d4015c977ba43ade1..f8d2df1150d0c7c2581de4ceb61e5647d52ab871 100644 --- a/src/connector/grafana/tdengine/package-lock.json +++ b/src/connector/grafana/tdengine/package-lock.json @@ -3992,9 +3992,9 @@ } }, "yarn": { - "version": "1.21.1", - "resolved": "https://registry.npmjs.org/yarn/-/yarn-1.21.1.tgz", - "integrity": "sha512-dQgmJv676X/NQczpbiDtc2hsE/pppGDJAzwlRiADMTvFzYbdxPj2WO4PcNyriSt2c4jsCMpt8UFRKHUozt21GQ==" + "version": "1.22.0", + "resolved": "https://registry.npmjs.org/yarn/-/yarn-1.22.0.tgz", + "integrity": "sha512-KMHP/Jq53jZKTY9iTUt3dIVl/be6UPs2INo96+BnZHLKxYNTfwMmlgHTaMWyGZoO74RI4AIFvnWhYrXq2USJkg==" } } } diff --git a/src/connector/grafana/tdengine/package.json b/src/connector/grafana/tdengine/package.json index 8e542bef2647aa6bb4fe2f404665a4ce9c707345..0eb7a76be6cfccd81f680f179c8e59499690201b 100644 --- a/src/connector/grafana/tdengine/package.json +++ b/src/connector/grafana/tdengine/package.json @@ -39,7 +39,7 @@ }, "dependencies": { "lodash": "^4.17.13", - "yarn": "^1.21.1" + "yarn": "^1.22.0" }, "homepage": "https://github.com/taosdata/TDengine/tree/develop/src/connector/grafana/tdengine" } diff --git a/src/connector/grafana/tdengine/yarn.lock b/src/connector/grafana/tdengine/yarn.lock index b2b869cb1ac179ece8d5e673bbabfe0856f9bd6e..fe7e8122ec371f66811c235be81f2e1276ccd5b1 100644 --- a/src/connector/grafana/tdengine/yarn.lock +++ b/src/connector/grafana/tdengine/yarn.lock @@ -2957,7 +2957,7 @@ yargs@~3.10.0: decamelize "^1.0.0" window-size "0.1.0" -yarn@^1.21.1: - version "1.21.1" - resolved "https://registry.yarnpkg.com/yarn/-/yarn-1.21.1.tgz#1d5da01a9a03492dc4a5957befc1fd12da83d89c" - integrity sha512-dQgmJv676X/NQczpbiDtc2hsE/pppGDJAzwlRiADMTvFzYbdxPj2WO4PcNyriSt2c4jsCMpt8UFRKHUozt21GQ== +yarn@^1.22.0: + version "1.22.0" + resolved "https://registry.yarnpkg.com/yarn/-/yarn-1.22.0.tgz#acf82906e36bcccd1ccab1cfb73b87509667c881" + integrity sha512-KMHP/Jq53jZKTY9iTUt3dIVl/be6UPs2INo96+BnZHLKxYNTfwMmlgHTaMWyGZoO74RI4AIFvnWhYrXq2USJkg== diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBError.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBError.java index 7d5b02606ea0b05069785331fe223503a205e46b..bb1b2afd07f4d9478786d462188ca3c43334785f 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBError.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBError.java @@ -48,7 +48,7 @@ public enum TSDBError { TSDB_CODE_INVALID_VALUE(24, "invalid value"), TSDB_CODE_REDIRECT(25, "service not available"), TSDB_CODE_ALREADY_THERE(26, "already there"), - TSDB_CODE_INVALID_METER_ID(27, "invalid meter ID"), + TSDB_CODE_INVALID_TABLE_ID(27, "invalid meter ID"), TSDB_CODE_INVALID_SQL(28, "invalid SQL"), // this message often comes with additional info which will vary based on the specific error situation TSDB_CODE_NETWORK_UNAVAIL(29, "failed to connect to server"), TSDB_CODE_INVALID_MSG_LEN(30, "invalid msg len"), diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index 30bc8a2ad35a87085419fd64f076522bd8ce6674..9530b6a4cba6fe1ad3eb7c271fa4957b5e1159d6 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -22,19 +22,14 @@ extern "C" { #include #include -#include "tsched.h" -#include "dnode.h" - -int dnodeProcessCreateTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj); -int dnodeProcessRemoveTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj); - -void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn); - -extern void *dmQhandle; +void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn); void dnodeSendVpeerCfgMsg(int32_t vnode); void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid); +extern int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType); +extern int32_t (*dnodeSendSimpleRspToMnode)(void *pConn, int32_t msgType, int32_t code); + #ifdef __cplusplus } #endif diff --git a/src/dnode/inc/dnodeModule.h b/src/dnode/inc/dnodeModule.h index 7967fa956e03d6c187ba5eecb17f496f92d5f6e4..57cfe9621c63d7171377ce740c28439a2a0b9a32 100644 --- a/src/dnode/inc/dnodeModule.h +++ b/src/dnode/inc/dnodeModule.h @@ -56,6 +56,8 @@ void dnodeAllocModules(); int32_t dnodeInitModules(); void dnodeCleanUpModules(); +extern void (*dnodeStartModules)(); + #ifdef __cplusplus } #endif diff --git a/src/dnode/inc/dnodeSystem.h b/src/dnode/inc/dnodeSystem.h index 7e94a642e78aa34305952164b2d24948d29e9408..15dcdbb39dcf80e2d9b312dea55050711e18cb39 100644 --- a/src/dnode/inc/dnodeSystem.h +++ b/src/dnode/inc/dnodeSystem.h @@ -22,8 +22,6 @@ extern "C" { #include #include -#include -#include "dnode.h" typedef enum { TSDB_DNODE_RUN_STATUS_INITIALIZE, @@ -39,6 +37,7 @@ extern void (*dnodeParseParameterK)(); extern int32_t tsMaxQueues; extern void ** tsRpcQhandle; extern void *tsQueryQhandle; +extern void *tsDnodeMgmtQhandle; int32_t dnodeInitSystem(); void dnodeCleanUpSystem(); diff --git a/src/dnode/inc/dnodeVnodeMgmt.h b/src/dnode/inc/dnodeVnodeMgmt.h index 9fee09166bdfd97f24fee07e7255c5b84dc26e25..321ac3083bd429c549b5195077f7eeb0313f3502 100644 --- a/src/dnode/inc/dnodeVnodeMgmt.h +++ b/src/dnode/inc/dnodeVnodeMgmt.h @@ -21,9 +21,10 @@ extern "C" { #endif #include - +#include #include "taosdef.h" #include "taosmsg.h" +#include "tstatus.h" /* * Open all Vnodes in the local data directory @@ -38,34 +39,34 @@ int32_t dnodeCleanupVnodes(); /* * Check if vnode already exists */ -int32_t dnodeCheckVnodeExist(int vid); +bool dnodeCheckVnodeExist(int32_t vid); /* * Create vnode with specified configuration and open it + * if exist, config it */ -//tsdb_repo_t* dnodeCreateVnode(int vid, SVnodeCfg *cfg); -void* dnodeCreateVnode(int vid, SVnodeCfg *cfg); +int32_t dnodeCreateVnode(int32_t vnode, SVPeersMsg *cfg); /* - * Modify vnode configuration information + * Remove vnode from local repository */ -int32_t dnodeConfigVnode(int vid, SVnodeCfg *cfg); +int32_t dnodeDropVnode(int32_t vnode); /* - * Modify vnode replication information + * Get the vnode object that has been opened */ -int32_t dnodeConfigVnodePeers(int vid/*, SVpeerCfgMsg *cfg*/); +//tsdb_repo_t* dnodeGetVnode(int vid); +void* dnodeGetVnode(int vid); /* - * Remove vnode from local repository + * get the status of vnode */ -int32_t dnodeDropVnode(int vid); +EVnodeStatus dnodeGetVnodeStatus(int32_t vnode); /* - * Get the vnode object that has been opened + * Check if vnode already exists, and table exist in this vnode */ -//tsdb_repo_t* dnodeGetVnode(int vid); -void* dnodeGetVnode(int vid); +bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid); #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeWrite.h b/src/dnode/inc/dnodeWrite.h index 6f74dee879b441f657792b789df46124eaedc7c0..78af597132ba635af7f61898e9b8350b240107c8 100644 --- a/src/dnode/inc/dnodeWrite.h +++ b/src/dnode/inc/dnodeWrite.h @@ -35,45 +35,26 @@ extern "C" { void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)); /* - * Create noraml table with specified configuration and open it + * Create table with specified configuration and open it + * if table already exist, update its schema and tag */ -int32_t dnodeCreateNormalTable(SCreateNormalTableMsg *table); +int32_t dnodeCreateTable(SDCreateTableMsg *table); /* - * Create stream table with specified configuration and open it - */ -int32_t dnodeCreateStreamTable(SCreateStreamTableMsg *table); - -/* - * Create child table with specified configuration and open it - */ -int32_t dnodeCreateChildTable(SCreateChildTableMsg *table); - -/* - * Modify normal table configuration information - * - */ -int32_t dnodeAlterNormalTable(SCreateNormalTableMsg *table); - -/* - * Modify stream table configuration information + * Remove table from local repository */ -int32_t dnodeAlterStreamTable(SCreateStreamTableMsg *table); +int32_t dnodeDropTable(int32_t vnode, int32_t sid, uint64_t uid); /* - * Modify child table configuration information + * Create stream + * if stream already exist, update it */ -int32_t dnodeAlterChildTable(SCreateChildTableMsg *table); +int32_t dnodeCreateStream(SAlterStreamMsg *stream); /* * Remove all child tables of supertable from local repository */ -int32_t dnodeDropSuperTable(int vid, int sid, int64_t uid); - -/* - * Remove table from local repository - */ -int32_t dnodeDropTable(int vid, int sid, int64_t uid); +int32_t dnodeDropSuperTable(uint64_t stableUid); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 72728f90addcedbee0b4f525a72d51b79f15a521..60786b27f4120940ada62fef1e0f7e9f963616bf 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -14,97 +14,57 @@ */ #define _DEFAULT_SOURCE - #include "os.h" - -#include "dnode.h" -#include "dnodeSystem.h" -#include "dnodeMgmt.h" - #include "taosmsg.h" #include "tlog.h" #include "trpc.h" #include "tsched.h" #include "tsystem.h" +#include "mnode.h" +#include "dnode.h" +#include "dnodeSystem.h" +#include "dnodeMgmt.h" +#include "dnodeWrite.h" +#include "dnodeVnodeMgmt.h" -SMgmtObj mgmtObj; -extern uint64_t tsCreatedTime; - -int dnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj); -int vnodeProcessCreateMeterMsg(char *pMsg, int msgLen); -int dnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj); -int dnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj); -int dnodeProcessTableCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj); -int dnodeProcessDnodeCfgRequest(char *cont, int contLen, SMgmtObj *pMgmtObj); -int dnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj); -void vnodeUpdateHeadFile(int vnode, int oldTables, int newTables); -void vnodeOpenVnode(int vnode); -void vnodeCleanUpOneVnode(int vnode); - -static int (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, void *pConn); +static int32_t (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn); static void dnodeInitProcessShellMsg(); -char *taosBuildRspMsgToMnodeWithSizeImp(SMgmtObj *pObj, char type, int size) { - char *pStart = (char *)malloc(size); - if (pStart == NULL) { - return NULL; - } +void dnodeSendMsgToMnodeImpFp(SSchedMsg *sched) { + int8_t msgType = *(int8_t *) (sched->msg - sizeof(int32_t) - sizeof(int8_t)); + int32_t contLen = *(int32_t *) (sched->msg - sizeof(int8_t)); + int8_t *pCont = sched->msg; + void *pConn = NULL; - *pStart = type; - return pStart + 1; + mgmtProcessMsgFromDnode(pCont, contLen, msgType, pConn); + rpcFreeCont(sched->msg); } -char *(*taosBuildRspMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size) = taosBuildRspMsgToMnodeWithSizeImp; -char *taosBuildReqMsgToMnodeWithSizeImp(SMgmtObj *pObj, char type, int size) { - char *pStart = (char *)malloc(size); - if (pStart == NULL) { - return NULL; - } +int32_t dnodeSendMsgToMnodeImp(int8_t *pCont, int32_t contLen, int8_t msgType) { + dTrace("msg:%s is sent to mnode", taosMsg[msgType]); + *(int8_t *) (pCont - sizeof(int32_t) - sizeof(int8_t)) = msgType; + *(int32_t *) (pCont - sizeof(int8_t)) = contLen; - *pStart = type; - return pStart + 1; -} -char *(*taosBuildReqMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size) = taosBuildReqMsgToMnodeWithSizeImp; + SSchedMsg schedMsg = {0}; + schedMsg.fp = dnodeSendMsgToMnodeImpFp; + schedMsg.msg = pCont; -char *taosBuildRspMsgToMnodeImp(SMgmtObj *pObj, char type) { - return taosBuildRspMsgToMnodeWithSizeImp(pObj, type, 256); -} -char *(*taosBuildRspMsgToMnode)(SMgmtObj *pObj, char type) = taosBuildRspMsgToMnodeImp; + taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg); -char *taosBuildReqMsgToMnodeImp(SMgmtObj *pObj, char type) { - return taosBuildReqMsgToMnodeWithSizeImp(pObj, type, 256); + return TSDB_CODE_SUCCESS; } -char *(*taosBuildReqMsgToMnode)(SMgmtObj *pObj, char type) = taosBuildReqMsgToMnodeImp; -int taosSendMsgToMnodeImp(SMgmtObj *pObj, char *msg, int msgLen) { - dTrace("msg:%s is sent to mnode", taosMsg[(uint8_t)(*(msg-1))]); +int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType) = dnodeSendMsgToMnodeImp; - /* - * Lite version has no message header, so minus one - */ - SSchedMsg schedMsg; - schedMsg.fp = mgmtProcessMsgFromDnodeSpec; - schedMsg.msg = msg - 1; - schedMsg.ahandle = NULL; - schedMsg.thandle = NULL; - taosScheduleTask(dmQhandle, &schedMsg); +int32_t dnodeSendSimpleRspToMnodeImp(void *pConn, int32_t msgType, int32_t code) { + int8_t *pCont = rpcMallocCont(sizeof(int32_t)); + *(int32_t *) pCont = code; - return 0; + dnodeSendMsgToMnodeImp(pCont, sizeof(int32_t), msgType); + return TSDB_CODE_SUCCESS; } -int (*taosSendMsgToMnode)(SMgmtObj *pObj, char *msg, int msgLen) = taosSendMsgToMnodeImp; - -int taosSendSimpleRspToMnodeImp(SMgmtObj *pObj, char rsptype, char code) { - char *pStart = taosBuildRspMsgToMnode(0, rsptype); - if (pStart == NULL) { - return 0; - } - *pStart = code; - taosSendMsgToMnode(0, pStart, code); - - return 0; -} -int (*taosSendSimpleRspToMnode)(SMgmtObj *pObj, char rsptype, char code) = taosSendSimpleRspToMnodeImp; +int32_t (*dnodeSendSimpleRspToMnode)(void *pConn, int32_t msgType, int32_t code) = dnodeSendSimpleRspToMnodeImp; int32_t dnodeInitMgmtImp() { dnodeInitProcessShellMsg(); @@ -117,478 +77,138 @@ void dnodeInitMgmtIpImp() {} void (*dnodeInitMgmtIp)() = dnodeInitMgmtIpImp; -void dnodeProcessMsgFromMgmtImp(SSchedMsg *sched) { - int32_t msgType = *(int32_t*)(sched->msg); - int8_t *content = sched->msg + sizeof(int32_t); - - dTrace("msg:%s is received from mgmt", taosMsg[msgType]); - dnodeDistributeMsgFromMgmt(content, 0, msgType, NULL); - - free(sched->msg); -} - -void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn) { +void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn) { if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { dError("invalid msg type:%d", msgType); } else { if (dnodeProcessShellMsgFp[msgType]) { - (*dnodeProcessShellMsgFp[msgType])(pConn, contLen, pConn); + (*dnodeProcessShellMsgFp[msgType])(pCont, contLen, msgType, pConn); } else { dError("%s is not processed", taosMsg[msgType]); } } } -int dnodeProcessTableCfgRsp(char *pMsg, int msgLen, SMgmtObj *pObj) { - int code = *pMsg; +int32_t dnodeProcessTableCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { + int32_t code = htonl(*((int32_t *) pCont)); - if (code == 0) { - vnodeProcessCreateMeterMsg(pMsg + 1, msgLen - 1); + if (code == TSDB_CODE_SUCCESS) { + SDCreateTableMsg *table = (SDCreateTableMsg *) (pCont + sizeof(int32_t)); + return dnodeCreateTable(table); + } else if (code == TSDB_CODE_INVALID_TABLE_ID) { + SDRemoveTableMsg *table = (SDRemoveTableMsg *) (pCont + sizeof(int32_t)); + int32_t vnode = htonl(table->vnode); + int32_t sid = htonl(table->sid); + uint64_t uid = htobe64(table->uid); + dError("vnode:%d, sid:%d table is not configured, remove it", vnode, sid); + return dnodeDropTable(vnode, sid, uid); } else { - STaosRsp *pRsp; - pRsp = (STaosRsp *)pMsg; - int32_t *pint = (int32_t *)pRsp->more; - int vnode = htonl(*pint); - int sid = htonl(*(pint + 1)); - dError("vid:%d, sid:%d, code:%d, meter is not configured, remove it", vnode, sid, code); - int ret = vnodeRemoveMeterObj(vnode, sid); - dTrace("vid:%d, sid:%d, meter delete ret:%d", vnode, sid, ret); + dError("code:%d invalid message", code); + return TSDB_CODE_INVALID_MSG; } - - return 0; } -int dnodeProcessCreateTableRequest(char *pMsg, int msgLen, SMgmtObj *pObj) { - SCreateMsg *pCreate; - int code = 0; - int vid; - SVnodeObj * pVnode; - - pCreate = (SCreateMsg *)pMsg; - vid = htons(pCreate->vnode); - - if (vid >= TSDB_MAX_VNODES || vid < 0) { - dError("vid:%d, vnode is out of range", vid); - code = TSDB_CODE_INVALID_VNODE_ID; - goto _over; - } - - pVnode = vnodeList + vid; - if (pVnode->cfg.maxSessions <= 0) { - dError("vid:%d, not activated", vid); - code = TSDB_CODE_NOT_ACTIVE_VNODE; - goto _over; - } - -// if (pVnode->syncStatus == TSDB_VN_SYNC_STATUS_SYNCING) { -// code = vnodeSaveCreateMsgIntoQueue(pVnode, pMsg, msgLen); -// dTrace("vid:%d, create msg is saved into sync queue", vid); -// } else { - code = vnodeProcessCreateMeterMsg(pMsg, msgLen); -// } - -_over: - taosSendSimpleRspToMnode(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE_RSP, code); - +int32_t dnodeProcessCreateTableRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { + SDCreateTableMsg *table = (SDCreateTableMsg *) pCont; + int32_t code = dnodeCreateTable(table); + dnodeSendSimpleRspToMnode(pConn, msgType + 1, code); return code; } -int dnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj) { - SAlterStreamMsg *pAlter; - int code = 0; - int vid, sid; - SVnodeObj * pVnode; - - pAlter = (SAlterStreamMsg *)pMsg; - vid = htons(pAlter->vnode); - sid = htonl(pAlter->sid); - - if (vid >= TSDB_MAX_VNODES || vid < 0) { - dError("vid:%d, vnode is out of range", vid); - code = TSDB_CODE_INVALID_VNODE_ID; - goto _over; - } - - pVnode = vnodeList + vid; - if (pVnode->cfg.maxSessions <= 0 || pVnode->pCachePool == NULL) { - dError("vid:%d is not activated yet", pAlter->vnode); - code = TSDB_CODE_NOT_ACTIVE_VNODE; - goto _over; - } - - if (pAlter->sid >= pVnode->cfg.maxSessions || pAlter->sid < 0) { - dError("vid:%d sid:%d uid:%" PRIu64 ", sid is out of range", pAlter->vnode, pAlter->sid, pAlter->uid); - code = TSDB_CODE_INVALID_TABLE_ID; - goto _over; - } - - SMeterObj *pMeterObj = vnodeList[vid].meterList[sid]; - if (pMeterObj == NULL || sid != pMeterObj->sid || vid != pMeterObj->vnode) { - dError("vid:%d sid:%d, not active table", vid, sid); - code = TSDB_CODE_NOT_ACTIVE_TABLE; - goto _over; - } - - pMeterObj->status = pAlter->status; - if (pMeterObj->status == 1) { - if (pAlter->stime > pMeterObj->lastKey) // starting time can be specified - pMeterObj->lastKey = pAlter->stime; - vnodeCreateStream(pMeterObj); - } else { - vnodeRemoveStream(pMeterObj); - } - - vnodeSaveMeterObjToFile(pMeterObj); - -_over: - taosSendSimpleRspToMnode(pObj, TSDB_MSG_TYPE_ALTER_STREAM_RSP, code); - +int32_t dnodeProcessAlterStreamRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { + SAlterStreamMsg *stream = (SAlterStreamMsg *) pCont; + int32_t code = dnodeCreateStream(stream); + dnodeSendSimpleRspToMnode(pConn, msgType + 1, code); return code; } -int vnodeProcessCreateMeterMsg(char *pMsg, int msgLen) { - int code; - SMeterObj * pObj = NULL; - SConnSec connSec; - SCreateMsg *pCreate = (SCreateMsg *)pMsg; - - pCreate->vnode = htons(pCreate->vnode); - pCreate->sid = htonl(pCreate->sid); - pCreate->lastCreate = htobe64(pCreate->lastCreate); - pCreate->timeStamp = htobe64(pCreate->timeStamp); - - if (pCreate->vnode >= TSDB_MAX_VNODES || pCreate->vnode < 0) { - dError("vid:%d is out of range", pCreate->vnode); - code = TSDB_CODE_INVALID_VNODE_ID; - goto _create_over; - } - - SVnodeObj *pVnode = vnodeList + pCreate->vnode; - if (pVnode->pCachePool == NULL) { - dError("vid:%d is not activated yet", pCreate->vnode); - vnodeSendVpeerCfgMsg(pCreate->vnode); - code = TSDB_CODE_NOT_ACTIVE_VNODE; - goto _create_over; - } - - if (pCreate->sid >= pVnode->cfg.maxSessions || pCreate->sid < 0) { - dError("vid:%d sid:%d id:%s, sid is out of range", pCreate->vnode, pCreate->sid, pCreate->meterId); - code = TSDB_CODE_INVALID_TABLE_ID; - goto _create_over; - } - - pCreate->numOfColumns = htons(pCreate->numOfColumns); - if (pCreate->numOfColumns <= 0) { - dTrace("vid:%d sid:%d id:%s, numOfColumns is out of range", pCreate->vnode, pCreate->sid, pCreate->meterId); - code = TSDB_CODE_OTHERS; - goto _create_over; - } - - pCreate->sqlLen = htons(pCreate->sqlLen); - pObj = (SMeterObj *)calloc(1, sizeof(SMeterObj) + pCreate->sqlLen + 1); - if (pObj == NULL) { - dError("vid:%d sid:%d id:%s, no memory to allocate meterObj", pCreate->vnode, pCreate->sid, pCreate->meterId); - code = TSDB_CODE_NO_RESOURCE; - goto _create_over; - } - - /* - * memory alignment may cause holes in SColumn struct which are not assigned any value - * therefore, we could not use memcmp to compare whether two SColumns are equal or not. - * So, we need to set the memory to 0 when allocating memory. - */ - pObj->schema = (SColumn *)calloc(1, pCreate->numOfColumns * sizeof(SColumn)); - - pObj->vnode = pCreate->vnode; - pObj->sid = pCreate->sid; - pObj->uid = pCreate->uid; - memcpy(pObj->meterId, pCreate->meterId, TSDB_TABLE_ID_LEN); - pObj->numOfColumns = pCreate->numOfColumns; - pObj->timeStamp = pCreate->timeStamp; - pObj->sversion = htonl(pCreate->sversion); - pObj->maxBytes = 0; - - for (int i = 0; i < pObj->numOfColumns; ++i) { - pObj->schema[i].type = pCreate->schema[i].type; - pObj->schema[i].bytes = htons(pCreate->schema[i].bytes); - pObj->schema[i].colId = htons(pCreate->schema[i].colId); - pObj->bytesPerPoint += pObj->schema[i].bytes; - if (pObj->maxBytes < pObj->schema[i].bytes) pObj->maxBytes = pObj->schema[i].bytes; - } - - if (pCreate->sqlLen > 0) { - pObj->sqlLen = pCreate->sqlLen; - pObj->pSql = ((char *)pObj) + sizeof(SMeterObj); - memcpy(pObj->pSql, (char *)pCreate->schema + pCreate->numOfColumns * sizeof(SMColumn), pCreate->sqlLen); - pObj->pSql[pCreate->sqlLen] = 0; - } - - pObj->pointsPerFileBlock = pVnode->cfg.rowsInFileBlock; - - if (sizeof(TSKEY) != pObj->schema[0].bytes) { - dError("key length is not matched, required key length:%d", sizeof(TSKEY)); - code = TSDB_CODE_OTHERS; - goto _create_over; - } - - // security info shall be saved here - connSec.spi = pCreate->spi; - connSec.encrypt = pCreate->encrypt; - memcpy(connSec.secret, pCreate->secret, TSDB_KEY_LEN); - memcpy(connSec.cipheringKey, pCreate->cipheringKey, TSDB_KEY_LEN); - - code = vnodeCreateMeterObj(pObj, &connSec); - -_create_over: - if (code != TSDB_CODE_SUCCESS) { - dTrace("vid:%d sid:%d id:%s, failed to create meterObj", pCreate->vnode, pCreate->sid, pCreate->meterId); - tfree(pObj); - } +int32_t dnodeProcessRemoveTableRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { + SDRemoveTableMsg *table = (SDRemoveTableMsg *) pCont; + int32_t vnode = htonl(table->vnode); + int32_t sid = htonl(table->sid); + uint64_t uid = htobe64(table->uid); + dPrint("vnode:%d, sid:%d table is not configured, remove it", vnode, sid); + int32_t code = dnodeDropTable(vnode, sid, uid); + dnodeSendSimpleRspToMnode(pConn, msgType + 1, code); return code; } -int dnodeProcessRemoveTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) { - SMeterObj * pObj; - SRemoveMeterMsg *pRemove; - int code = 0; - - pRemove = (SRemoveMeterMsg *)pMsg; - pRemove->vnode = htons(pRemove->vnode); - pRemove->sid = htonl(pRemove->sid); - - if (pRemove->vnode < 0 || pRemove->vnode >= TSDB_MAX_VNODES) { - dWarn("vid:%d sid:%d, already removed", pRemove->vnode, pRemove->sid); - goto _remove_over; - } - - if (vnodeList[pRemove->vnode].meterList == NULL) goto _remove_over; - - pObj = vnodeList[pRemove->vnode].meterList[pRemove->sid]; - if (pObj == NULL) goto _remove_over; - - if (memcmp(pObj->meterId, pRemove->meterId, TSDB_TABLE_ID_LEN) != 0) { - dWarn("vid:%d sid:%d id:%s, remove ID:%s, meter ID not matched", pObj->vnode, pObj->sid, pObj->meterId, - pRemove->meterId); - goto _remove_over; - } - - if (vnodeRemoveMeterObj(pRemove->vnode, pRemove->sid) == TSDB_CODE_ACTION_IN_PROGRESS) { - code = TSDB_CODE_ACTION_IN_PROGRESS; - goto _remove_over; - } - - dTrace("vid:%d sid:%d id:%s, meterObj is removed", pRemove->vnode, pRemove->sid, pRemove->meterId); +int32_t dnodeProcessVPeerCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { + int32_t code = htonl(*((int32_t *) pCont)); -_remove_over: - taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_REMOVE_RSP, code); - return 0; -} - -int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) { - SVPeersMsg *pMsg = (SVPeersMsg *)msg; - int i, vnode; - - vnode = htonl(pMsg->vnode); - if (vnode >= TSDB_MAX_VNODES) { - dError("vid:%d, vnode is out of range", vnode); - return -1; - } - - if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_CREATING) { - dTrace("vid:%d, vnode is still under creating", vnode); - return 0; - } - - SVnodeCfg *pCfg = &pMsg->cfg; - pCfg->vgId = htonl(pCfg->vgId); - pCfg->maxSessions = htonl(pCfg->maxSessions); - pCfg->cacheBlockSize = htonl(pCfg->cacheBlockSize); - pCfg->cacheNumOfBlocks.totalBlocks = htonl(pCfg->cacheNumOfBlocks.totalBlocks); - pCfg->daysPerFile = htonl(pCfg->daysPerFile); - pCfg->daysToKeep1 = htonl(pCfg->daysToKeep1); - pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2); - pCfg->daysToKeep = htonl(pCfg->daysToKeep); - pCfg->commitTime = htonl(pCfg->commitTime); - pCfg->blocksPerMeter = htons(pCfg->blocksPerMeter); - pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock); - - if (pCfg->replications > 0) { - dPrint("vid:%d, vpeer cfg received, replica:%d session:%d, vnodeList replica:%d session:%d, acct:%s db:%s", - vnode, pCfg->replications, pCfg->maxSessions, vnodeList[vnode].cfg.replications, vnodeList[vnode].cfg.maxSessions, - pCfg->acct, pCfg->db); - for (i = 0; i < pCfg->replications; ++i) { - pMsg->vpeerDesc[i].vnode = htonl(pMsg->vpeerDesc[i].vnode); - pMsg->vpeerDesc[i].ip = htonl(pMsg->vpeerDesc[i].ip); - dPrint("vid:%d, vpeer:%d ip:0x%x vid:%d ", vnode, i, pMsg->vpeerDesc[i].ip, pMsg->vpeerDesc[i].vnode); - } - } - - if (vnodeList[vnode].cfg.maxSessions == 0) { - dPrint("vid:%d, vnode is empty", vnode); - if (pCfg->maxSessions > 0) { - if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_OFFLINE) { - dPrint("vid:%d, status:%s, start to create vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); - return vnodeCreateVnode(vnode, pCfg, pMsg->vpeerDesc); - } else { - dPrint("vid:%d, status:%s, cannot preform create vnode operation", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); - return TSDB_CODE_INVALID_VNODE_STATUS; - } - } + if (code == TSDB_CODE_SUCCESS) { + SVPeersMsg *vpeer = (SVPeersMsg *) (pCont + sizeof(int32_t)); + int32_t vnode = htonl(vpeer->vnode); + return dnodeCreateVnode(vnode, vpeer); + } else if (code == TSDB_CODE_INVALID_VNODE_ID) { + SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) (pCont + sizeof(int32_t)); + int32_t vnode = htonl(vpeer->vnode); + dError("vnode:%d, not exist, remove it", vnode); + return dnodeDropVnode(vnode); } else { - dPrint("vid:%d, vnode is not empty", vnode); - if (pCfg->maxSessions > 0) { - if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_DELETING) { - dPrint("vid:%d, status:%s, wait vnode delete finished", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); - } else { - dPrint("vid:%d, status:%s, start to update vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); - - if (pCfg->maxSessions != vnodeList[vnode].cfg.maxSessions) { - vnodeCleanUpOneVnode(vnode); - } - - vnodeConfigVPeers(vnode, pCfg->replications, pMsg->vpeerDesc); - vnodeSaveVnodeCfg(vnode, pCfg, pMsg->vpeerDesc); - - /* - if (pCfg->maxSessions != vnodeList[vnode].cfg.maxSessions) { - vnodeUpdateHeadFile(vnode, vnodeList[vnode].cfg.maxSessions, pCfg->maxSessions); - vnodeList[vnode].cfg.maxSessions = pCfg->maxSessions; - vnodeOpenVnode(vnode); - } - */ - } - return 0; - } else { - dPrint("vid:%d, status:%s, start to delete vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); - vnodeRemoveVnode(vnode); - } + dError("code:%d invalid message", code); + return TSDB_CODE_INVALID_MSG; } - - return 0; } -int dnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj) { - STaosRsp *pRsp; - - pRsp = (STaosRsp *)msg; - - if (pRsp->code == 0) { - vnodeProcessVPeerCfg(pRsp->more, msgLen - sizeof(STaosRsp), pMgmtObj); - } else { - int32_t *pint = (int32_t *)pRsp->more; - int vnode = htonl(*pint); - if (vnode < TSDB_MAX_VNODES && vnodeList[vnode].lastKey != 0) { - dError("vnode:%d not configured, it shall be empty, code:%d", vnode, pRsp->code); - vnodeRemoveVnode(vnode); - } else { - dError("vnode:%d is invalid, code:%d", vnode, pRsp->code); - } - } - - return 0; -} - -int dnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj) { - int code = 0; - - code = vnodeProcessVPeerCfg(msg, msgLen, pMgmtObj); - - char * pStart; - STaosRsp * pRsp; - SVPeersMsg *pVPeersMsg = (SVPeersMsg *)msg; - - pStart = taosBuildRspMsgToMnode(pMgmtObj, TSDB_MSG_TYPE_DNODE_VPEERS_RSP); - if (pStart == NULL) return -1; - - pRsp = (STaosRsp *)pStart; - pRsp->code = code; - memcpy(pRsp->more, pVPeersMsg->cfg.db, TSDB_DB_NAME_LEN); +int32_t dnodeProcessVPeersMsg(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { + SVPeersMsg *vpeer = (SVPeersMsg *) pCont; + int32_t vnode = htonl(vpeer->vnode); - msgLen = sizeof(STaosRsp) + TSDB_DB_NAME_LEN; - taosSendMsgToMnode(pMgmtObj, pStart, msgLen); + dPrint("vnode:%d, start to config", vnode); + int32_t code = dnodeCreateVnode(vnode, vpeer); + dnodeSendSimpleRspToMnode(pConn, msgType + 1, code); return code; } -int dnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) { - SFreeVnodeMsg *pFree; +int32_t dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { + SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) pCont; + int32_t vnode = htonl(vpeer->vnode); - pFree = (SFreeVnodeMsg *)pMsg; - pFree->vnode = htons(pFree->vnode); + dPrint("vnode:%d, remove it", vnode); - if (pFree->vnode < 0 || pFree->vnode >= TSDB_MAX_VNODES) { - dWarn("vid:%d, out of range", pFree->vnode); - return -1; - } - - dTrace("vid:%d, receive free vnode message", pFree->vnode); - int32_t code = vnodeRemoveVnode(pFree->vnode); - assert(code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS); + int32_t code = dnodeDropVnode(vnode); + dnodeSendSimpleRspToMnode(pConn, msgType + 1, code); - taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP, code); - return 0; + return code; } -int dnodeProcessDnodeCfgRequest(char *cont, int contLen, SMgmtObj *pMgmtObj) { - SCfgMsg *pCfg = (SCfgMsg *)cont; - - int code = tsCfgDynamicOptions(pCfg->config); - - taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_DNODE_CFG_RSP, code); - - return 0; +int32_t dnodeProcessDnodeCfgRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { + SCfgMsg *pCfg = (SCfgMsg *)pCont; + int32_t code = tsCfgDynamicOptions(pCfg->config); + dnodeSendSimpleRspToMnode(pConn, msgType + 1, code); + return code; } void dnodeSendVpeerCfgMsg(int32_t vnode) { - char * pMsg, *pStart; - int msgLen; - SVpeerCfgMsg *pCfg; - SMgmtObj * pObj = &mgmtObj; - - pStart = taosBuildReqMsgToMnode(pObj, TSDB_MSG_TYPE_VNODE_CFG); - if (pStart == NULL) return; - pMsg = pStart; - - pCfg = (SVpeerCfgMsg *)pMsg; - pCfg->vnode = htonl(vnode); - pMsg += sizeof(SVpeerCfgMsg); + SVpeerCfgMsg *cfg = (SVpeerCfgMsg *) rpcMallocCont(sizeof(SVpeerCfgMsg)); + if (cfg == NULL) { + return; + } - msgLen = pMsg - pStart; - taosSendMsgToMnode(pObj, pStart, msgLen); + cfg->vnode = htonl(vnode); + dnodeSendMsgToMnode((int8_t*)cfg, sizeof(SVpeerCfgMsg), TSDB_MSG_TYPE_VNODE_CFG); } void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { - char * pMsg, *pStart; - int msgLen; - SMeterCfgMsg *pCfg; - SMgmtObj * pObj = &mgmtObj; - - pStart = taosBuildReqMsgToMnode(pObj, TSDB_MSG_TYPE_TABLE_CFG); - if (pStart == NULL) return -1; - pMsg = pStart; - - pCfg = (SMeterCfgMsg *)pMsg; - pCfg->vnode = htonl(vnode); - pCfg->sid = htonl(sid); - pMsg += sizeof(SMeterCfgMsg); - - msgLen = pMsg - pStart; - return taosSendMsgToMnode(pObj, pStart, msgLen); -} + SMeterCfgMsg *cfg = (SMeterCfgMsg *) rpcMallocCont(sizeof(SMeterCfgMsg)); + if (cfg == NULL) { + return; + } + cfg->vnode = htonl(vnode); + dnodeSendMsgToMnode((int8_t*)cfg, sizeof(SMeterCfgMsg), TSDB_MSG_TYPE_TABLE_CFG); +} void dnodeInitProcessShellMsg() { - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE] = dnodeProcessCreateTableRequest; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_NORMAL_TABLE] = dnodeProcessCreateTableRequest; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_STREAM_TABLE] = dnodeProcessCreateTableRequest; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE] = dnodeProcessRemoveTableRequest; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_NORMAL_TABLE] = dnodeProcessRemoveTableRequest; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_STREAM_TABLE] = dnodeProcessRemoveTableRequest; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_VPEERS] = dnodeProcessVPeersMsg; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_FREE_VNODE] = dnodeProcessFreeVnodeRequest; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_TABLE_CFG_RSP] = dnodeProcessTableCfgRsp; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableRequest; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessRemoveTableRequest; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_VPEERS] = dnodeProcessVPeersMsg; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_FREE_VNODE] = dnodeProcessFreeVnodeRequest; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_TABLE_CFG_RSP] = dnodeProcessTableCfgRsp; } \ No newline at end of file diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index 370a96ac61b7f3abf5bee08891898f8534b62590..7166d8c8a219f98746fbe646b2e4acdd7bd7fc28 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -17,41 +17,41 @@ #include "os.h" #include "tlog.h" #include "tglobalcfg.h" +#include "mnode.h" +#include "http.h" +#include "monitor.h" #include "dnodeModule.h" #include "dnodeSystem.h" -#include "monitorSystem.h" -#include "httpSystem.h" -#include "mgmtSystem.h" -SModule tsModule[TSDB_MOD_MAX] = {0}; -uint32_t tsModuleStatus = 0; +SModule tsModule[TSDB_MOD_MAX] = {0}; +uint32_t tsModuleStatus = 0; void dnodeAllocModules() { - tsModule[TSDB_MOD_MGMT].name = "mgmt"; - tsModule[TSDB_MOD_MGMT].initFp = mgmtInitSystem; - tsModule[TSDB_MOD_MGMT].cleanUpFp = mgmtCleanUpSystem; - tsModule[TSDB_MOD_MGMT].startFp = mgmtStartSystem; - tsModule[TSDB_MOD_MGMT].stopFp = mgmtStopSystem; - tsModule[TSDB_MOD_MGMT].num = tsNumOfMPeers; - tsModule[TSDB_MOD_MGMT].curNum = 0; + tsModule[TSDB_MOD_MGMT].name = "mgmt"; + tsModule[TSDB_MOD_MGMT].initFp = mgmtInitSystem; + tsModule[TSDB_MOD_MGMT].cleanUpFp = mgmtCleanUpSystem; + tsModule[TSDB_MOD_MGMT].startFp = mgmtStartSystem; + tsModule[TSDB_MOD_MGMT].stopFp = mgmtStopSystem; + tsModule[TSDB_MOD_MGMT].num = tsNumOfMPeers; + tsModule[TSDB_MOD_MGMT].curNum = 0; tsModule[TSDB_MOD_MGMT].equalVnodeNum = tsMgmtEqualVnodeNum; - tsModule[TSDB_MOD_HTTP].name = "http"; - tsModule[TSDB_MOD_HTTP].initFp = httpInitSystem; - tsModule[TSDB_MOD_HTTP].cleanUpFp = httpCleanUpSystem; - tsModule[TSDB_MOD_HTTP].startFp = httpStartSystem; - tsModule[TSDB_MOD_HTTP].stopFp = httpStopSystem; - tsModule[TSDB_MOD_HTTP].num = (tsEnableHttpModule == 1) ? -1 : 0; - tsModule[TSDB_MOD_HTTP].curNum = 0; + tsModule[TSDB_MOD_HTTP].name = "http"; + tsModule[TSDB_MOD_HTTP].initFp = httpInitSystem; + tsModule[TSDB_MOD_HTTP].cleanUpFp = httpCleanUpSystem; + tsModule[TSDB_MOD_HTTP].startFp = httpStartSystem; + tsModule[TSDB_MOD_HTTP].stopFp = httpStopSystem; + tsModule[TSDB_MOD_HTTP].num = (tsEnableHttpModule == 1) ? -1 : 0; + tsModule[TSDB_MOD_HTTP].curNum = 0; tsModule[TSDB_MOD_HTTP].equalVnodeNum = 0; - tsModule[TSDB_MOD_MONITOR].name = "monitor"; - tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem; - tsModule[TSDB_MOD_MONITOR].cleanUpFp = monitorCleanUpSystem; - tsModule[TSDB_MOD_MONITOR].startFp = monitorStartSystem; - tsModule[TSDB_MOD_MONITOR].stopFp = monitorStopSystem; - tsModule[TSDB_MOD_MONITOR].num = (tsEnableMonitorModule == 1) ? -1 : 0; - tsModule[TSDB_MOD_MONITOR].curNum = 0; + tsModule[TSDB_MOD_MONITOR].name = "monitor"; + tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem; + tsModule[TSDB_MOD_MONITOR].cleanUpFp = monitorCleanUpSystem; + tsModule[TSDB_MOD_MONITOR].startFp = monitorStartSystem; + tsModule[TSDB_MOD_MONITOR].stopFp = monitorStopSystem; + tsModule[TSDB_MOD_MONITOR].num = (tsEnableMonitorModule == 1) ? -1 : 0; + tsModule[TSDB_MOD_MONITOR].curNum = 0; tsModule[TSDB_MOD_MONITOR].equalVnodeNum = 0; } @@ -71,7 +71,7 @@ void dnodeCleanUpModules() { } void dnodeProcessModuleStatus(uint32_t status) { - if (tsDnodeRunStatus) { + if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { return; } @@ -112,7 +112,7 @@ int32_t dnodeInitModules() { } } - return 0; + return TSDB_CODE_SUCCESS; } void dnodeStartModulesImp() { @@ -128,4 +128,5 @@ void dnodeStartModulesImp() { (*tsModule[TSDB_MOD_MGMT].cleanUpFp)(); } } + void (*dnodeStartModules)() = dnodeStartModulesImp; diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 5173d8624d828247f9171265f22308f341d8cae4..b73d51780e83bb4bf4e90d6f2346d6a7e006b0ab 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -17,6 +17,7 @@ #include "os.h" #include "taoserror.h" #include "tlog.h" +#include "tsched.h" #include "dnode.h" #include "dnodeRead.h" #include "dnodeSystem.h" @@ -32,14 +33,14 @@ void dnodeQueryData(SQueryMeterMsg *pQuery, void *pConn, void (*callback)(int32_ } static void dnodeExecuteRetrieveData(SSchedMsg *pSched) { - SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *)pSched->msg; + //SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *)pSched->msg; SDnodeRetrieveCallbackFp callback = (SDnodeRetrieveCallbackFp)pSched->thandle; void *pConn = pSched->ahandle; //examples int32_t code = TSDB_CODE_INVALID_QHANDLE; void *pQInfo = NULL; //get from pConn - (*callback)(code, NULL, pConn); + (*callback)(code, pQInfo, pConn); //TODO build response here @@ -47,8 +48,8 @@ static void dnodeExecuteRetrieveData(SSchedMsg *pSched) { } void dnodeRetrieveData(SRetrieveMeterMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp) { - int8_t *msg = malloc(sizeof(pRetrieve)); - memcpy(msg, pRetrieve, sizeof(pRetrieve)); + int8_t *msg = malloc(sizeof(SRetrieveMeterMsg)); + memcpy(msg, pRetrieve, sizeof(SRetrieveMeterMsg)); SSchedMsg schedMsg; schedMsg.msg = msg; @@ -62,6 +63,8 @@ int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveMeterRsp *retrievalRsp) { return 0; } -int32_t dnodeGetRetrieveDataSize(void *pQInfo) {} +int32_t dnodeGetRetrieveDataSize(void *pQInfo) { + return 0; +} diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index c873f11eaa4694eb351c0235556518381fbd922e..50a0f81503625aa7b35ffa4ca31d5b3e4058e534 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -29,7 +29,7 @@ #include "dnodeRead.h" #include "dnodeSystem.h" #include "dnodeShell.h" -#include "dnodeUtil.h" +#include "dnodeVnodeMgmt.h" #include "dnodeWrite.h" static void dnodeProcessRetrieveRequest(int8_t *pCont, int32_t contLen, void *pConn); @@ -40,19 +40,19 @@ static void *tsDnodeShellServer = NULL; static int32_t tsDnodeQueryReqNum = 0; static int32_t tsDnodeSubmitReqNum = 0; -void dnodeProcessMsgFromShell(int32_t msgType, void *pCont, int32_t contLen, void *handle, int32_t index) { +void* dnodeProcessMsgFromShell(int8_t msgType, void *pCont, int32_t contLen, void *handle, int32_t index) { assert(handle != NULL); if (pCont == NULL || contLen == 0) { dnodeFreeQInfo(handle); dTrace("conn:%p, free query info", handle); - return; + return NULL; } if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { rpcSendSimpleRsp(handle, TSDB_CODE_NOT_READY); dTrace("conn:%p, query msg is ignored since dnode not running", handle); - return; + return NULL; } dTrace("conn:%p, msg:%s is received", handle, taosMsg[msgType]); @@ -66,6 +66,8 @@ void dnodeProcessMsgFromShell(int32_t msgType, void *pCont, int32_t contLen, voi } else { dError("conn:%p, msg:%s is not processed", handle, taosMsg[msgType]); } + + return NULL; } int32_t dnodeInitShell() { diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index 3d77cf2e83118adb2f432ad30e15773b96ddeb76..0f009f0f11aad8ac734a6bc7d1a297275746243c 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -19,6 +19,7 @@ #include "taoserror.h" #include "tcrc32c.h" #include "tlog.h" +#include "tsched.h" #include "ttime.h" #include "ttimer.h" #include "tutil.h" @@ -53,7 +54,7 @@ static int32_t dnodeInitTmrCtl(); void *tsStatusTimer = NULL; void *vnodeTmrCtrl; void **tsRpcQhandle; -void *dmQhandle; +void *tsDnodeMgmtQhandle; void *tsQueryQhandle; int32_t tsVnodePeers = TSDB_VNODES_SUPPORT - 1; int32_t tsMaxQueues; @@ -298,7 +299,7 @@ static int32_t dnodeInitRpcQHandle() { tsRpcQhandle[i] = taosInitScheduler(tsSessionsPerVnode, 1, "dnode"); } - dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt"); + tsDnodeMgmtQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt"); return 0; } diff --git a/src/dnode/src/dnodeUtil.c b/src/dnode/src/dnodeUtil.c deleted file mode 100644 index b1d2fc0cb6ec33279bc7543821561b4f2c58563e..0000000000000000000000000000000000000000 --- a/src/dnode/src/dnodeUtil.c +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "dnodeUtil.h" - -EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) { - return TSDB_VN_STATUS_MASTER; -} - -bool dnodeCheckVnodeExist(int32_t vnode) { - return true; -} diff --git a/src/dnode/src/dnodeVnodeMgmt.c b/src/dnode/src/dnodeVnodeMgmt.c index 4a2012aa35cb2219d62e1bbf8c1ce696ddbc76ab..ee92991203000bb641c2db317a01c32443a90836 100644 --- a/src/dnode/src/dnodeVnodeMgmt.c +++ b/src/dnode/src/dnodeVnodeMgmt.c @@ -14,4 +14,20 @@ */ #define _DEFAULT_SOURCE +#include "os.h" +#include "tlog.h" +#include "taoserror.h" +#include "dnodeVnodeMgmt.h" + +EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) { + return TSDB_VN_STATUS_MASTER; +} + +bool dnodeCheckVnodeExist(int32_t vnode) { + return true; +} + +bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid) { + return true; +} diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 588ba49483d8aecd4369996ee98e63805c8d26a5..59ede2711b15ff1c1384d05f408e4837d95d80e8 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -18,6 +18,7 @@ #include "taoserror.h" #include "tlog.h" #include "dnodeWrite.h" +#include "dnodeVnodeMgmt.h" void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)) { SShellSubmitRspMsg result = {0}; @@ -32,35 +33,40 @@ void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShe //TODO: submit implementation } -int32_t dnodeCreateNormalTable(SCreateNormalTableMsg *table) { - return 0; +int32_t dnodeCreateTable(SDCreateTableMsg *table) { + return TSDB_CODE_SUCCESS; } -int32_t dnodeCreateStreamTable(SCreateStreamTableMsg *table) { - return 0; -} -int32_t dnodeCreateChildTable(SCreateChildTableMsg *table) { - return 0; +/* + * Remove table from local repository + */ +int32_t dnodeDropTable(int32_t vnode, int32_t sid, uint64_t uid) { + return TSDB_CODE_SUCCESS; } -int32_t dnodeAlterNormalTable(SCreateNormalTableMsg *table) { - return 0; -} +/* + * Create stream + * if stream already exist, update it + */ +int32_t dnodeCreateStream(SAlterStreamMsg *stream) { + int32_t vnode = htonl(stream->vnode); + int32_t sid = htonl(stream->sid); + uint64_t uid = htobe64(stream->uid); -int32_t dnodeAlterStreamTable(SCreateStreamTableMsg *table) { - return 0; -} + if (!dnodeCheckTableExist(vnode, sid, uid)) { + return TSDB_CODE_INVALID_TABLE; + } -int32_t dnodeAlterChildTable(SCreateChildTableMsg *table) { - return 0; -} + //TODO create or remove stream -int32_t dnodeDropSuperTable(int vid, int sid, int64_t uid) { return 0; } -int32_t dnodeDropTable(int vid, int sid, int64_t uid) { - return 0; +/* + * Remove all child tables of supertable from local repository + */ +int32_t dnodeDropSuperTable(uint64_t stableUid) { + return TSDB_CODE_SUCCESS; } diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 27594686d44c2c0bbc1f21322c3ae449012a03c5..4aaf0e866ee47273a57617079821bda0a0246b22 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -22,7 +22,6 @@ extern "C" { #include #include -#include "tsched.h" typedef struct { int32_t queryReqNum; @@ -45,15 +44,16 @@ extern uint32_t tsRebootTime; extern void (*dnodeStartModules)(); extern void (*dnodeParseParameterK)(); extern int32_t (*dnodeCheckSystem)(); -extern char *(*taosBuildRspMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size); -extern char *(*taosBuildReqMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size); -extern char *(*taosBuildRspMsgToMnode)(SMgmtObj *pObj, char type); -extern char *(*taosBuildReqMsgToMnode)(SMgmtObj *pObj, char type); -extern int (*taosSendMsgToMnode)(SMgmtObj *pObj, char *msg, int msgLen); -extern int (*taosSendSimpleRspToMnode)(SMgmtObj *pObj, char rsptype, char code); extern void (*dnodeInitMgmtIp)(); extern int (*dnodeInitMgmt)(); +// dnodeMgmt +void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn); +extern int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType); +extern int32_t (*dnodeSendSimpleRspToMnode)(void *pConn, int32_t msgType, int32_t code); + +// dnodeModule +extern void (*dnodeStartModules)(); // multilevelStorage extern int32_t (*dnodeInitStorage)(); @@ -61,9 +61,6 @@ extern void (*dnodeCleanupStorage)(); void dnodeCheckDataDirOpenned(const char* dir); -void dnodeProcessMsgFromMgmtImp(SSchedMsg *sched); - - void dnodeLockVnodes(); void dnodeUnLockVnodes(); SDnodeStatisInfo dnodeGetStatisInfo(); diff --git a/src/inc/http.h b/src/inc/http.h index 9ef36560e13e91e8d8652a9233ef160c6ede72c2..0d4c386cbf1e784834019c5d75847ab20b7ce8e9 100644 --- a/src/inc/http.h +++ b/src/inc/http.h @@ -20,33 +20,13 @@ extern "C" { #endif -#include "tglobalcfg.h" -#include "tlog.h" - -#define httpError(...) \ - if (httpDebugFlag & DEBUG_ERROR) { \ - tprintf("ERROR HTP ", 255, __VA_ARGS__); \ - } -#define httpWarn(...) \ - if (httpDebugFlag & DEBUG_WARN) { \ - tprintf("WARN HTP ", httpDebugFlag, __VA_ARGS__); \ - } -#define httpTrace(...) \ - if (httpDebugFlag & DEBUG_TRACE) { \ - tprintf("HTP ", httpDebugFlag, __VA_ARGS__); \ - } -#define httpDump(...) \ - if (httpDebugFlag & DEBUG_TRACE) { \ - taosPrintLongString("HTP ", httpDebugFlag, __VA_ARGS__); \ - } -#define httpPrint(...) \ - { tprintf("HTP ", 255, __VA_ARGS__); } - -#define httpLError(...) taosLogError(__VA_ARGS__) httpError(__VA_ARGS__) -#define httpLWarn(...) taosLogWarn(__VA_ARGS__) httpWarn(__VA_ARGS__) -#define httpLPrint(...) taosLogPrint(__VA_ARGS__) httpPrint(__VA_ARGS__) +#include int32_t httpGetReqCount(); +int32_t httpInitSystem(); +int32_t httpStartSystem(); +void httpStopSystem(); +void httpCleanUpSystem(); #ifdef __cplusplus } diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 101df94805670749006bbba898b9b349cb3c503c..4f92e28d316b557582ae333ad2aac9cc1d0ee022 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -336,6 +336,14 @@ typedef struct { } SShowObj; +//mgmtSystem +int32_t mgmtStartSystem(); +void mgmtCleanUpSystem(); +void mgmtProcessMsgFromDnode(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn); +extern int32_t (*mgmtInitSystem)(); +extern void (*mgmtStopSystem)(); +extern void (*mgmtCleanUpRedirect)(); + #ifdef __cplusplus } #endif diff --git a/src/modules/monitor/inc/monitorSystem.h b/src/inc/monitor.h similarity index 76% rename from src/modules/monitor/inc/monitorSystem.h rename to src/inc/monitor.h index e64b7fad6e7e38a6ad67f439fdceebc9e579e52e..bb63bf63a45c9235278bbad76a941ec29a24fc74 100644 --- a/src/modules/monitor/inc/monitorSystem.h +++ b/src/inc/monitor.h @@ -13,16 +13,22 @@ * along with this program. If not, see . */ -#ifndef __MONITOR_SYSTEM_H__ -#define __MONITOR_SYSTEM_H__ +#ifndef TDENGINE_MONITOR_H +#define TDENGINE_MONITOR_H -#include +#ifdef __cplusplus +extern "C" { +#endif -int monitorInitSystem(); -int monitorStartSystem(); +#include + +int32_t monitorInitSystem(); +int32_t monitorStartSystem(); void monitorStopSystem(); void monitorCleanUpSystem(); -extern void (*mnodeCountRequestFp)(SDnodeStatisInfo *info); +#ifdef __cplusplus +} +#endif -#endif \ No newline at end of file +#endif diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index dcc224ef05483e92c514e34ed8cb0122f0e902e3..f153a9a5739ebbae5c84b4a36065776472aa8083 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -26,7 +26,7 @@ extern "C" { #ifdef TAOS_ERROR_C #define TAOS_DEFINE_ERROR(name, mod, code, msg) {.val = (0x80000000 | ((mod)<<16) | (code)), .str=(msg)}, #else -#define TAOS_DEFINE_ERROR(name, mod, code, msg) const int32_t name = (0x80000000 | ((mod)<<16) | (code)); +#define TAOS_DEFINE_ERROR(name, mod, code, msg) static const int32_t name = (0x80000000 | ((mod)<<16) | (code)); #endif #define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code)) @@ -161,6 +161,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VNODE_STATUS, 0, 116, "invalid vnode s TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_LOCK_RESOURCES, 0, 117, "failed to lock resources") TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 118, "table id mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CACHE_ERASED, 0, 119, "query cache erased") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, 0, 120, "invalid message") #ifdef TAOS_ERROR_C }; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 0b41a1b9499a9485df6c421f31816b5e404d5658..0c4fa999b9b358d67d338681ea2be85f394fe7c4 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -28,118 +28,97 @@ extern "C" { #include "taosdef.h" // message type -#define TSDB_MSG_TYPE_REG 1 -#define TSDB_MSG_TYPE_REG_RSP 2 -#define TSDB_MSG_TYPE_DNODE_SUBMIT 3 -#define TSDB_MSG_TYPE_DNODE_SUBMIT_RSP 4 -#define TSDB_MSG_TYPE_DNODE_QUERY 5 -#define TSDB_MSG_TYPE_DNODE_QUERY_RSP 6 -#define TSDB_MSG_TYPE_DNODE_RETRIEVE 7 -#define TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP 8 -#define TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE 9 -#define TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE_RSP 10 -#define TSDB_MSG_TYPE_DNODE_CREATE_NORMAL_TABLE 11 -#define TSDB_MSG_TYPE_DNODE_CREATE_NORMAL_TABLE_RSP 12 -#define TSDB_MSG_TYPE_DNODE_CREATE_STREAM_TABLE 13 -#define TSDB_MSG_TYPE_DNODE_CREATE_STREAM_TABLE_RSP 14 -#define TSDB_MSG_TYPE_DNODE_CREATE_SUPER_TABLE 15 -#define TSDB_MSG_TYPE_DNODE_CREATE_SUPER_TABLE_RSP 16 -#define TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE 17 -#define TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE_RSP 18 -#define TSDB_MSG_TYPE_DNODE_REMOVE_NORMAL_TABLE 19 -#define TSDB_MSG_TYPE_DNODE_REMOVE_NORMAL_TABLE_RSP 20 -#define TSDB_MSG_TYPE_DNODE_REMOVE_STREAM_TABLE 21 -#define TSDB_MSG_TYPE_DNODE_REMOVE_STREAM_TABLE_RSP 22 -#define TSDB_MSG_TYPE_DNODE_REMOVE_SUPER_TABLE 23 -#define TSDB_MSG_TYPE_DNODE_REMOVE_SUPER_TABLE_RSP 24 -#define TSDB_MSG_TYPE_DNODE_ALTER_CHILD_TABLE 25 -#define TSDB_MSG_TYPE_DNODE_ALTER_CHILD_TABLE_RSP 26 -#define TSDB_MSG_TYPE_DNODE_ALTER_NORMAL_TABLE 27 -#define TSDB_MSG_TYPE_DNODE_ALTER_NORMAL_TABLE_RSP 28 -#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_TABLE 29 -#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_TABLE_RSP 30 -#define TSDB_MSG_TYPE_DNODE_ALTER_SUPER_TABLE 31 -#define TSDB_MSG_TYPE_DNODE_ALTER_SUPER_TABLE_RSP 32 -#define TSDB_MSG_TYPE_DNODE_VPEERS 33 -#define TSDB_MSG_TYPE_DNODE_VPEERS_RSP 34 -#define TSDB_MSG_TYPE_DNODE_FREE_VNODE 35 -#define TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP 36 -#define TSDB_MSG_TYPE_DNODE_CFG 37 -#define TSDB_MSG_TYPE_DNODE_CFG_RSP 38 -#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM 39 -#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_RSP 40 - -#define TSDB_MSG_TYPE_SDB_SYNC 41 -#define TSDB_MSG_TYPE_SDB_SYNC_RSP 42 -#define TSDB_MSG_TYPE_SDB_FORWARD 43 -#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 44 -#define TSDB_MSG_TYPE_CONNECT 51 -#define TSDB_MSG_TYPE_CONNECT_RSP 52 -#define TSDB_MSG_TYPE_CREATE_ACCT 53 -#define TSDB_MSG_TYPE_CREATE_ACCT_RSP 54 -#define TSDB_MSG_TYPE_ALTER_ACCT 55 -#define TSDB_MSG_TYPE_ALTER_ACCT_RSP 56 -#define TSDB_MSG_TYPE_DROP_ACCT 57 -#define TSDB_MSG_TYPE_DROP_ACCT_RSP 58 -#define TSDB_MSG_TYPE_CREATE_USER 59 -#define TSDB_MSG_TYPE_CREATE_USER_RSP 60 -#define TSDB_MSG_TYPE_ALTER_USER 61 -#define TSDB_MSG_TYPE_ALTER_USER_RSP 62 -#define TSDB_MSG_TYPE_DROP_USER 63 -#define TSDB_MSG_TYPE_DROP_USER_RSP 64 -#define TSDB_MSG_TYPE_CREATE_MNODE 65 -#define TSDB_MSG_TYPE_CREATE_MNODE_RSP 66 -#define TSDB_MSG_TYPE_DROP_MNODE 67 -#define TSDB_MSG_TYPE_DROP_MNODE_RSP 68 -#define TSDB_MSG_TYPE_CREATE_DNODE 69 -#define TSDB_MSG_TYPE_CREATE_DNODE_RSP 70 -#define TSDB_MSG_TYPE_DROP_DNODE 71 -#define TSDB_MSG_TYPE_DROP_DNODE_RSP 72 -#define TSDB_MSG_TYPE_ALTER_DNODE 73 -#define TSDB_MSG_TYPE_ALTER_DNODE_RSP 74 -#define TSDB_MSG_TYPE_CREATE_DB 75 -#define TSDB_MSG_TYPE_CREATE_DB_RSP 76 -#define TSDB_MSG_TYPE_DROP_DB 77 -#define TSDB_MSG_TYPE_DROP_DB_RSP 78 -#define TSDB_MSG_TYPE_USE_DB 79 -#define TSDB_MSG_TYPE_USE_DB_RSP 80 -#define TSDB_MSG_TYPE_ALTER_DB 81 -#define TSDB_MSG_TYPE_ALTER_DB_RSP 82 -#define TSDB_MSG_TYPE_CREATE_TABLE 83 -#define TSDB_MSG_TYPE_CREATE_TABLE_RSP 84 -#define TSDB_MSG_TYPE_DROP_TABLE 85 -#define TSDB_MSG_TYPE_DROP_TABLE_RSP 86 -#define TSDB_MSG_TYPE_ALTER_TABLE 87 -#define TSDB_MSG_TYPE_ALTER_TABLE_RSP 88 -#define TSDB_MSG_TYPE_VNODE_CFG 89 -#define TSDB_MSG_TYPE_VNODE_CFG_RSP 90 -#define TSDB_MSG_TYPE_TABLE_CFG 91 -#define TSDB_MSG_TYPE_TABLE_CFG_RSP 92 -#define TSDB_MSG_TYPE_TABLE_META 93 -#define TSDB_MSG_TYPE_TABLE_META_RSP 94 -#define TSDB_MSG_TYPE_STABLE_META 95 -#define TSDB_MSG_TYPE_STABLE_META_RSP 96 -#define TSDB_MSG_TYPE_MULTI_TABLE_META 97 -#define TSDB_MSG_TYPE_MULTI_TABLE_META_RSP 98 -#define TSDB_MSG_TYPE_ALTER_STREAM 99 -#define TSDB_MSG_TYPE_ALTER_STREAM_RSP 100 -#define TSDB_MSG_TYPE_SHOW 101 -#define TSDB_MSG_TYPE_SHOW_RSP 102 -#define TSDB_MSG_TYPE_CFG_MNODE 103 -#define TSDB_MSG_TYPE_CFG_MNODE_RSP 104 -#define TSDB_MSG_TYPE_KILL_QUERY 105 -#define TSDB_MSG_TYPE_KILL_QUERY_RSP 106 -#define TSDB_MSG_TYPE_KILL_STREAM 107 -#define TSDB_MSG_TYPE_KILL_STREAM_RSP 108 -#define TSDB_MSG_TYPE_KILL_CONNECTION 109 -#define TSDB_MSG_TYPE_KILL_CONNECTION_RSP 110 -#define TSDB_MSG_TYPE_HEARTBEAT 111 -#define TSDB_MSG_TYPE_HEARTBEAT_RSP 112 -#define TSDB_MSG_TYPE_STATUS 113 -#define TSDB_MSG_TYPE_STATUS_RSP 114 -#define TSDB_MSG_TYPE_GRANT 115 -#define TSDB_MSG_TYPE_GRANT_RSP 116 -#define TSDB_MSG_TYPE_MAX 117 +#define TSDB_MSG_TYPE_REG 1 +#define TSDB_MSG_TYPE_REG_RSP 2 +#define TSDB_MSG_TYPE_DNODE_SUBMIT 3 +#define TSDB_MSG_TYPE_DNODE_SUBMIT_RSP 4 +#define TSDB_MSG_TYPE_DNODE_QUERY 5 +#define TSDB_MSG_TYPE_DNODE_QUERY_RSP 6 +#define TSDB_MSG_TYPE_DNODE_RETRIEVE 7 +#define TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP 8 +#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE 9 +#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP 10 +#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE 11 +#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP 12 +#define TSDB_MSG_TYPE_DNODE_VPEERS 13 +#define TSDB_MSG_TYPE_DNODE_VPEERS_RSP 14 +#define TSDB_MSG_TYPE_DNODE_FREE_VNODE 15 +#define TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP 16 +#define TSDB_MSG_TYPE_DNODE_CFG 17 +#define TSDB_MSG_TYPE_DNODE_CFG_RSP 18 +#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM 19 +#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_RSP 20 +#define TSDB_MSG_TYPE_SDB_SYNC 21 +#define TSDB_MSG_TYPE_SDB_SYNC_RSP 22 +#define TSDB_MSG_TYPE_SDB_FORWARD 23 +#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 24 +#define TSDB_MSG_TYPE_CONNECT 31 +#define TSDB_MSG_TYPE_CONNECT_RSP 32 +#define TSDB_MSG_TYPE_CREATE_ACCT 33 +#define TSDB_MSG_TYPE_CREATE_ACCT_RSP 34 +#define TSDB_MSG_TYPE_ALTER_ACCT 35 +#define TSDB_MSG_TYPE_ALTER_ACCT_RSP 36 +#define TSDB_MSG_TYPE_DROP_ACCT 37 +#define TSDB_MSG_TYPE_DROP_ACCT_RSP 38 +#define TSDB_MSG_TYPE_CREATE_USER 39 +#define TSDB_MSG_TYPE_CREATE_USER_RSP 40 +#define TSDB_MSG_TYPE_ALTER_USER 41 +#define TSDB_MSG_TYPE_ALTER_USER_RSP 42 +#define TSDB_MSG_TYPE_DROP_USER 43 +#define TSDB_MSG_TYPE_DROP_USER_RSP 44 +#define TSDB_MSG_TYPE_CREATE_MNODE 45 +#define TSDB_MSG_TYPE_CREATE_MNODE_RSP 46 +#define TSDB_MSG_TYPE_DROP_MNODE 47 +#define TSDB_MSG_TYPE_DROP_MNODE_RSP 48 +#define TSDB_MSG_TYPE_CREATE_DNODE 49 +#define TSDB_MSG_TYPE_CREATE_DNODE_RSP 50 +#define TSDB_MSG_TYPE_DROP_DNODE 51 +#define TSDB_MSG_TYPE_DROP_DNODE_RSP 52 +#define TSDB_MSG_TYPE_ALTER_DNODE 53 +#define TSDB_MSG_TYPE_ALTER_DNODE_RSP 54 +#define TSDB_MSG_TYPE_CREATE_DB 55 +#define TSDB_MSG_TYPE_CREATE_DB_RSP 56 +#define TSDB_MSG_TYPE_DROP_DB 57 +#define TSDB_MSG_TYPE_DROP_DB_RSP 58 +#define TSDB_MSG_TYPE_USE_DB 59 +#define TSDB_MSG_TYPE_USE_DB_RSP 60 +#define TSDB_MSG_TYPE_ALTER_DB 61 +#define TSDB_MSG_TYPE_ALTER_DB_RSP 62 +#define TSDB_MSG_TYPE_CREATE_TABLE 63 +#define TSDB_MSG_TYPE_CREATE_TABLE_RSP 64 +#define TSDB_MSG_TYPE_DROP_TABLE 65 +#define TSDB_MSG_TYPE_DROP_TABLE_RSP 66 +#define TSDB_MSG_TYPE_ALTER_TABLE 67 +#define TSDB_MSG_TYPE_ALTER_TABLE_RSP 68 +#define TSDB_MSG_TYPE_VNODE_CFG 69 +#define TSDB_MSG_TYPE_VNODE_CFG_RSP 70 +#define TSDB_MSG_TYPE_TABLE_CFG 71 +#define TSDB_MSG_TYPE_TABLE_CFG_RSP 72 +#define TSDB_MSG_TYPE_TABLE_META 73 +#define TSDB_MSG_TYPE_TABLE_META_RSP 74 +#define TSDB_MSG_TYPE_STABLE_META 75 +#define TSDB_MSG_TYPE_STABLE_META_RSP 76 +#define TSDB_MSG_TYPE_MULTI_TABLE_META 77 +#define TSDB_MSG_TYPE_MULTI_TABLE_META_RSP 78 +#define TSDB_MSG_TYPE_ALTER_STREAM 79 +#define TSDB_MSG_TYPE_ALTER_STREAM_RSP 80 +#define TSDB_MSG_TYPE_SHOW 81 +#define TSDB_MSG_TYPE_SHOW_RSP 82 +#define TSDB_MSG_TYPE_CFG_MNODE 83 +#define TSDB_MSG_TYPE_CFG_MNODE_RSP 84 +#define TSDB_MSG_TYPE_KILL_QUERY 85 +#define TSDB_MSG_TYPE_KILL_QUERY_RSP 86 +#define TSDB_MSG_TYPE_KILL_STREAM 87 +#define TSDB_MSG_TYPE_KILL_STREAM_RSP 88 +#define TSDB_MSG_TYPE_KILL_CONNECTION 89 +#define TSDB_MSG_TYPE_KILL_CONNECTION_RSP 90 +#define TSDB_MSG_TYPE_HEARTBEAT 91 +#define TSDB_MSG_TYPE_HEARTBEAT_RSP 92 +#define TSDB_MSG_TYPE_STATUS 93 +#define TSDB_MSG_TYPE_STATUS_RSP 94 +#define TSDB_MSG_TYPE_GRANT 95 +#define TSDB_MSG_TYPE_GRANT_RSP 96 +#define TSDB_MSG_TYPE_MAX 97 // IE type #define TSDB_IE_TYPE_SEC 1 @@ -313,72 +292,28 @@ typedef struct SSchema { short bytes; } SSchema; -typedef struct SMColumn { +typedef struct { int8_t type; int16_t colId; int16_t bytes; -} SMColumn; - -typedef struct { - int32_t size; - int8_t* data; -} SVariableMsg; - -typedef struct { - short vnode; - int32_t sid; - uint64_t uid; - char spi; - char encrypt; - char meterId[TSDB_TABLE_ID_LEN]; - char secret[TSDB_KEY_LEN]; - char cipheringKey[TSDB_KEY_LEN]; - uint64_t timeStamp; - uint64_t lastCreate; - short numOfColumns; - short sqlLen; // SQL string is after schema - char reserved[16]; - int32_t sversion; - SMColumn schema[]; -} SCreateMsg; +} SDTableColumn; typedef struct { int32_t vnode; int32_t sid; uint64_t uid; - char tableId[TSDB_TABLE_ID_LEN + 1]; - char superTableId[TSDB_TABLE_ID_LEN + 1]; - uint64_t createdTime; + uint64_t superTableUid; + int32_t tableType; int32_t sversion; int16_t numOfColumns; int16_t numOfTags; int32_t tagDataLen; - int8_t data[]; -} SCreateChildTableMsg; - -typedef struct { - int32_t vnode; - int32_t sid; - uint64_t uid; - char tableId[TSDB_TABLE_ID_LEN + 1]; + int32_t sqlDataLen; uint64_t createdTime; - int32_t sversion; - int16_t numOfColumns; - int8_t data[]; -} SCreateNormalTableMsg; - -typedef struct { - int32_t vnode; - int32_t sid; - uint64_t uid; char tableId[TSDB_TABLE_ID_LEN + 1]; - uint64_t createdTime; - int32_t sversion; - int16_t numOfColumns; - int32_t sqlLen; + char superTableId[TSDB_TABLE_ID_LEN + 1]; int8_t data[]; -} SCreateStreamTableMsg; - +} SDCreateTableMsg; typedef struct { char db[TSDB_TABLE_ID_LEN]; @@ -468,10 +403,10 @@ typedef struct { int32_t sid; uint64_t uid; char meterId[TSDB_TABLE_ID_LEN]; -} SRemoveMeterMsg; +} SDRemoveTableMsg; typedef struct { - short vnode; + int32_t vnode; } SFreeVnodeMsg; typedef struct SColIndexEx { @@ -753,6 +688,7 @@ typedef struct { typedef struct { int32_t dnode; //the ID of dnode int32_t vnode; //the index of vnode + uint32_t ip; } SVPeerDesc; typedef struct { @@ -923,11 +859,11 @@ typedef struct { } SKillQuery, SKillStream, SKillConnection; typedef struct { - short vnode; + int32_t vnode; int32_t sid; uint64_t uid; uint64_t stime; // stream starting time - char status; + int32_t status; } SAlterStreamMsg; #pragma pack(pop) diff --git a/src/inc/trpc.h b/src/inc/trpc.h index d8f8d7bdfb22e43c7ba45dcdd76903e5b8ac54ae..cb5a8aeaa3ac4abb612a0ca5e5ae6d59f9f4dd88 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -19,6 +19,10 @@ extern "C" { #endif +#include +#include +#include "taosdef.h" + #define TAOS_CONN_UDPS 0 #define TAOS_CONN_UDPC 1 #define TAOS_CONN_TCPS 2 @@ -35,8 +39,8 @@ extern "C" { extern int tsRpcHeadSize; typedef struct { - int8_t index; - int8_t numOfIps; + int16_t index; + int16_t numOfIps; uint16_t port; uint32_t ip[TSDB_MAX_MPEERS]; } SRpcIpSet; diff --git a/src/mnode/inc/mgmtDnodeInt.h b/src/mnode/inc/mgmtDnodeInt.h index a81b197f2c75902c4312453c5320aed3651d861c..e9d6aa1a26aa850ac030e36d5e7911c03e60610b 100644 --- a/src/mnode/inc/mgmtDnodeInt.h +++ b/src/mnode/inc/mgmtDnodeInt.h @@ -38,14 +38,11 @@ int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid); char *taosBuildRspMsgToDnode(SDnodeObj *pObj, char type); char *taosBuildReqMsgToDnode(SDnodeObj *pObj, char type); -extern char* (*taosBuildRspMsgToDnodeWithSize)(SDnodeObj *pObj, char type, int32_t size); -extern char* (*taosBuildReqMsgToDnodeWithSize)(SDnodeObj *pObj, char type, int32_t size); -extern int32_t (*taosSendSimpleRspToDnode)(SDnodeObj *pObj, char rsptype, char code); -extern int32_t (*taosSendMsgToDnode)(SDnodeObj *pObj, char *msg, int32_t msgLen); +extern int32_t (*mgmtSendSimpleRspToDnode)(int32_t msgType, int32_t code); +extern int32_t (*mgmtSendMsgToDnode)(SDnodeObj *pObj, char *msg, int msgLen); extern int32_t (*mgmtInitDnodeInt)(); extern void (*mgmtCleanUpDnodeInt)(); extern void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId); -extern void (*mgmtProcessMsgFromDnodeSpec)(SSchedMsg *sched); #ifdef __cplusplus diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 84090da5cfe7a24cf3afbb4a03d224a763db40b7..4d86baab87881cf63071ad75f29c2690bb2b0193 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -50,7 +50,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { SVgObj * pVgroup; if (!sdbMaster) { - taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_REDIRECT); + mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_REDIRECT); return 0; } @@ -59,7 +59,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { pStart = taosBuildRspMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, 64000); if (pStart == NULL) { - taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); + mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); return 0; } @@ -78,7 +78,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { pMsg = mgmtBuildCreateMeterIe(pTable, pMsg, vnode); } else { mTrace("dnode:%s, vnode:%d sid:%d, meter not there", taosIpStr(pObj->privateIp), vnode, sid); - *pMsg = TSDB_CODE_INVALID_METER_ID; + *pMsg = TSDB_CODE_INVALID_TABLE_ID; pMsg++; *(int32_t *)pMsg = htonl(vnode); @@ -88,7 +88,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { } msgLen = pMsg - pStart; - taosSendMsgToDnode(pObj, pStart, msgLen); + mgmtSendMsgToDnode(pObj, pStart, msgLen); return 0; } @@ -100,7 +100,7 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { SVgObj * pVgroup = NULL; if (!sdbMaster) { - taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_REDIRECT); + mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_REDIRECT); return 0; } @@ -108,7 +108,7 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { pStart = taosBuildRspMsgToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP); if (pStart == NULL) { - taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); + mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); return 0; } pMsg = pStart; @@ -129,7 +129,7 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { } msgLen = pMsg - pStart; - taosSendMsgToDnode(pObj, pStart, msgLen); + mgmtSendMsgToDnode(pObj, pStart, msgLen); return 0; } @@ -142,7 +142,7 @@ int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) { STaosRsp *pRsp = (STaosRsp *)msg; if (!sdbMaster) { - taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_DNODE_VPEERS_RSP, TSDB_CODE_REDIRECT); + mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_DNODE_VPEERS_RSP, TSDB_CODE_REDIRECT); return 0; } @@ -251,7 +251,7 @@ int32_t mgmtSendCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, int int8_t *pMsg = mgmtBuildCreateChildTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode, tagDataLen, pTagData); int32_t msgLen = pMsg - pStart; - taosSendMsgToDnode(pObj, pStart, msgLen); + mgmtSendMsgToDnode(pObj, pStart, msgLen); } pVgroup->lastCreate = timeStamp; @@ -275,7 +275,7 @@ int32_t mgmtSendCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup) { int8_t *pMsg = mgmtBuildCreateStreamTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode); int32_t msgLen = pMsg - pStart; - taosSendMsgToDnode(pObj, pStart, msgLen); + mgmtSendMsgToDnode(pObj, pStart, msgLen); } pVgroup->lastCreate = timeStamp; @@ -299,7 +299,7 @@ int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) { int8_t *pMsg = mgmtBuildCreateNormalTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode); int32_t msgLen = pMsg - pStart; - taosSendMsgToDnode(pObj, pStart, msgLen); + mgmtSendMsgToDnode(pObj, pStart, msgLen); } pVgroup->lastCreate = timeStamp; @@ -307,7 +307,7 @@ int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) { } int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) { - SRemoveMeterMsg *pRemove; + SDRemoveTableMsg *pRemove; char * pMsg, *pStart; int i, msgLen = 0; SDnodeObj * pObj; @@ -326,15 +326,15 @@ int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) { if (pStart == NULL) continue; pMsg = pStart; - pRemove = (SRemoveMeterMsg *)pMsg; + pRemove = (SDRemoveTableMsg *)pMsg; pRemove->vnode = htons(pVgroup->vnodeGid[i].vnode); pRemove->sid = htonl(pTable->gid.sid); memcpy(pRemove->meterId, pTable->meterId, TSDB_TABLE_ID_LEN); - pMsg += sizeof(SRemoveMeterMsg); + pMsg += sizeof(SDRemoveTableMsg); msgLen = pMsg - pStart; - taosSendMsgToDnode(pObj, pStart, msgLen); + mgmtSendMsgToDnode(pObj, pStart, msgLen); tinet_ntoa(ipstr, pVgroup->vnodeGid[i].ip); mTrace("dnode:%s vid:%d, send remove meter msg, sid:%d status:%d", ipstr, pVgroup->vnodeGid[i].vnode, @@ -371,7 +371,7 @@ int mgmtSendAlterStreamMsgToDnode(STabObj *pTable, SVgObj *pVgroup) { pMsg += sizeof(SAlterStreamMsg); msgLen = pMsg - pStart; - taosSendMsgToDnode(pObj, pStart, msgLen); + mgmtSendMsgToDnode(pObj, pStart, msgLen); } return 0; @@ -433,7 +433,7 @@ int mgmtSendVPeersMsg(SVgObj *pVgroup) { pMsg = mgmtBuildVpeersIe(pStart, pVgroup, pVgroup->vnodeGid[i].vnode); msgLen = pMsg - pStart; - taosSendMsgToDnode(pDnode, pStart, msgLen); + mgmtSendMsgToDnode(pDnode, pStart, msgLen); } } @@ -467,7 +467,7 @@ int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid) { pMsg += sizeof(SFreeVnodeMsg); msgLen = pMsg - pStart; - taosSendMsgToDnode(pDnode, pStart, msgLen); + mgmtSendMsgToDnode(pDnode, pStart, msgLen); return 0; } @@ -547,7 +547,7 @@ int mgmtSendCfgDnodeMsg(char *cont) { pMsg += sizeof(SCfgMsg); msgLen = pMsg - pStart; - taosSendMsgToDnode(pDnode, pStart, msgLen); + mgmtSendMsgToDnode(pDnode, pStart, msgLen); #else (void)tsCfgDynamicOptions(pCfg->config); #endif @@ -559,61 +559,46 @@ int mgmtSendCfgDnodeMsg(char *cont) { * functions for communicate between dnode and mnode */ -extern void *dmQhandle; +extern void *tsDnodeMgmtQhandle; void * mgmtStatusTimer = NULL; void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *pObj); -char* taosBuildRspMsgToDnodeWithSizeImp(SDnodeObj *pObj, char type, int32_t size) { - char *pStart = (char *)malloc(size); - if (pStart == NULL) { - return NULL; - } +void mgmtSendMsgToDnodeImpFp(SSchedMsg *sched) { + int8_t msgType = *(int8_t *) (sched->msg - sizeof(int32_t) - sizeof(int8_t)); + int32_t contLen = *(int32_t *) (sched->msg - sizeof(int8_t)); + int8_t *pCont = sched->msg; + void *pConn = NULL; - *pStart = type; - return pStart + 1; + dnodeProcessMsgFromMgmt(pCont, contLen, msgType, pConn); + rpcFreeCont(sched->msg); } -char* (*taosBuildRspMsgToDnodeWithSize)(SDnodeObj *pObj, char type, int32_t size) = taosBuildRspMsgToDnodeWithSizeImp; -char* taosBuildReqMsgToDnodeWithSizeImp(SDnodeObj *pObj, char type, int32_t size) { - char *pStart = (char *)malloc(size); - if (pStart == NULL) { - return NULL; - } +int32_t mgmtSendMsgToDnodeImp(int8_t *pCont, int32_t contLen, int8_t msgType) { + mTrace("msg:%s is sent to dnode", taosMsg[msgType]); + *(int8_t *) (pCont - sizeof(int32_t) - sizeof(int8_t)) = msgType; + *(int32_t *) (pCont - sizeof(int8_t)) = contLen; - *pStart = type; - return pStart + 1; -} -char* (*taosBuildReqMsgToDnodeWithSize)(SDnodeObj *pObj, char type, int32_t size) = taosBuildReqMsgToDnodeWithSizeImp; + SSchedMsg schedMsg = {0}; + schedMsg.fp = mgmtSendMsgToDnodeImpFp; + schedMsg.msg = pCont; -char *taosBuildRspMsgToDnode(SDnodeObj *pObj, char type) { - return taosBuildRspMsgToDnodeWithSize(pObj, type, 256); -} + taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg); -char *taosBuildReqMsgToDnode(SDnodeObj *pObj, char type) { - return taosBuildReqMsgToDnodeWithSize(pObj, type, 256); + return TSDB_CODE_SUCCESS; } -int32_t taosSendSimpleRspToDnodeImp(SDnodeObj *pObj, char rsptype, char code) { return 0; } -int32_t (*taosSendSimpleRspToDnode)(SDnodeObj *pObj, char rsptype, char code) = taosSendSimpleRspToDnodeImp; +int32_t (*mgmtSendMsgToDnode)(SDnodeObj *pObj, char *msg, int msgLen) = mgmtSendMsgToDnodeImp; -int32_t taosSendMsgToDnodeImp(SDnodeObj *pObj, char *msg, int32_t msgLen) { - mTrace("msg:%s is sent to dnode", taosMsg[(uint8_t)(*(msg-1))]); +int32_t mgmtSendSimpleRspToDnodeImp(int32_t msgType, int32_t code) { + int8_t *pCont = rpcMallocCont(sizeof(int32_t)); + *(int32_t *) pCont = code; - /* - * Lite version has no message header, so minus one - */ - SSchedMsg schedMsg; - schedMsg.fp = dnodeProcessMsgFromMgmtImp; - schedMsg.msg = msg - 1; - schedMsg.ahandle = NULL; - schedMsg.thandle = NULL; - taosScheduleTask(dmQhandle, &schedMsg); - - return 0; + mgmtSendMsgToDnodeImp(pCont, sizeof(int32_t), msgType); + return TSDB_CODE_SUCCESS; } -int32_t (*taosSendMsgToDnode)(SDnodeObj *pObj, char *msg, int msgLen) = taosSendMsgToDnodeImp; +int32_t (*mgmtSendSimpleRspToDnode)(int32_t msgType, int32_t code) = mgmtSendSimpleRspToDnodeImp; int32_t mgmtInitDnodeIntImp() { return 0; } int32_t (*mgmtInitDnodeInt)() = mgmtInitDnodeIntImp; @@ -682,13 +667,3 @@ void mgmtProcessDnodeStatusImp(void *handle, void *tmrId) { */ } void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId) = mgmtProcessDnodeStatusImp; - -void mgmtProcessMsgFromDnodeSpecImp(SSchedMsg *sched) { - char msgType = *sched->msg; - char *content = sched->msg + 1; - mTrace("msg:%s is received from dnode", taosMsg[(uint8_t)msgType]); - - mgmtProcessMsgFromDnode(content, 0, msgType, mgmtGetDnode(0)); - free(sched->msg); -} -void (*mgmtProcessMsgFromDnodeSpec)(SSchedMsg *sched) = mgmtProcessMsgFromDnodeSpecImp; diff --git a/src/mnode/src/mgmtSystem.c b/src/mnode/src/mgmtSystem.c index 5148975c95971e8a7a7c997e259c058ece8498f5..74cf029055da9b067e036dd2609b18162c8aba1d 100644 --- a/src/mnode/src/mgmtSystem.c +++ b/src/mnode/src/mgmtSystem.c @@ -162,7 +162,10 @@ int mgmtStartSystem() { return 0; } -int32_t mgmtInitSystemImp() { return mgmtStartSystem(); } +int32_t mgmtInitSystemImp() { + return mgmtStartSystem(); +} + int32_t (*mgmtInitSystem)() = mgmtInitSystemImp; int32_t mgmtCheckMgmtRunningImp() { return 0; } @@ -177,6 +180,7 @@ void mgmtStartMgmtTimerImp() { void (*mgmtStartMgmtTimer)() = mgmtStartMgmtTimerImp; void mgmtStopSystemImp() {} + void (*mgmtStopSystem)() = mgmtStopSystemImp; void mgmtCleanUpRedirectImp() {} diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index ce3175abcce26390e4702d70fb16674a5734015d..ffd7ca4aa2a7279238bce491254d6ccd8bf0c4a1 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -296,11 +296,11 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { if (pShow->payloadLen > 0 ) { pTable = mgmtGetTable(pShow->payload); if (NULL == pTable) { - return TSDB_CODE_INVALID_METER_ID; + return TSDB_CODE_INVALID_TABLE_ID; } pVgroup = mgmtGetVgroup(pTable->gid.vgId); - if (NULL == pVgroup) return TSDB_CODE_INVALID_METER_ID; + if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID; maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica; } else { diff --git a/src/modules/monitor/inc/monitor.h b/src/modules/monitor/inc/monitor.h deleted file mode 100644 index 954f4898b195cb90fcb28fb4290af7e24c1a6fbd..0000000000000000000000000000000000000000 --- a/src/modules/monitor/inc/monitor.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef __MONITOR_H__ -#define __MONITOR_H__ - -#include "tglobalcfg.h" -#include "tlog.h" - -#define monitorError(...) \ - if (monitorDebugFlag & DEBUG_ERROR) { \ - tprintf("ERROR MON ", 255, __VA_ARGS__); \ - } -#define monitorWarn(...) \ - if (monitorDebugFlag & DEBUG_WARN) { \ - tprintf("WARN MON ", monitorDebugFlag, __VA_ARGS__); \ - } -#define monitorTrace(...) \ - if (monitorDebugFlag & DEBUG_TRACE) { \ - tprintf("MON ", monitorDebugFlag, __VA_ARGS__); \ - } -#define monitorPrint(...) \ - { tprintf("MON ", 255, __VA_ARGS__); } - -#define monitorLError(...) taosLogError(__VA_ARGS__) monitorError(__VA_ARGS__) -#define monitorLWarn(...) taosLogWarn(__VA_ARGS__) monitorWarn(__VA_ARGS__) -#define monitorLPrint(...) taosLogPrint(__VA_ARGS__) monitorPrint(__VA_ARGS__) - -#endif \ No newline at end of file diff --git a/src/modules/CMakeLists.txt b/src/plugins/CMakeLists.txt similarity index 100% rename from src/modules/CMakeLists.txt rename to src/plugins/CMakeLists.txt diff --git a/src/modules/http/CMakeLists.txt b/src/plugins/http/CMakeLists.txt similarity index 100% rename from src/modules/http/CMakeLists.txt rename to src/plugins/http/CMakeLists.txt diff --git a/src/modules/http/inc/cJSON.h b/src/plugins/http/inc/cJSON.h similarity index 100% rename from src/modules/http/inc/cJSON.h rename to src/plugins/http/inc/cJSON.h diff --git a/src/modules/http/inc/gcHandle.h b/src/plugins/http/inc/gcHandle.h similarity index 100% rename from src/modules/http/inc/gcHandle.h rename to src/plugins/http/inc/gcHandle.h diff --git a/src/modules/http/inc/gcJson.h b/src/plugins/http/inc/gcJson.h similarity index 100% rename from src/modules/http/inc/gcJson.h rename to src/plugins/http/inc/gcJson.h diff --git a/src/modules/http/inc/httpCode.h b/src/plugins/http/inc/httpCode.h similarity index 100% rename from src/modules/http/inc/httpCode.h rename to src/plugins/http/inc/httpCode.h diff --git a/src/modules/http/inc/httpHandle.h b/src/plugins/http/inc/httpHandle.h similarity index 100% rename from src/modules/http/inc/httpHandle.h rename to src/plugins/http/inc/httpHandle.h diff --git a/src/modules/http/inc/httpJson.h b/src/plugins/http/inc/httpJson.h similarity index 100% rename from src/modules/http/inc/httpJson.h rename to src/plugins/http/inc/httpJson.h diff --git a/src/modules/http/inc/httpResp.h b/src/plugins/http/inc/httpResp.h similarity index 100% rename from src/modules/http/inc/httpResp.h rename to src/plugins/http/inc/httpResp.h diff --git a/src/modules/http/inc/httpSystem.h b/src/plugins/http/inc/httpSystem.h similarity index 83% rename from src/modules/http/inc/httpSystem.h rename to src/plugins/http/inc/httpSystem.h index ccf0f3582dd2b1d3a48ce9cc93d505e61b53c879..afe49edb2f76a907ae869aa85a05ab41625a5531 100644 --- a/src/modules/http/inc/httpSystem.h +++ b/src/plugins/http/inc/httpSystem.h @@ -16,9 +16,19 @@ #ifndef TDENGINE_HTTP_SYSTEM_H #define TDENGINE_HTTP_SYSTEM_H -int httpInitSystem(); -int httpStartSystem(); +#ifdef __cplusplus +extern "C" { +#endif + +#include + +int32_t httpInitSystem(); +int32_t httpStartSystem(); void httpStopSystem(); void httpCleanUpSystem(); +#ifdef __cplusplus +} +#endif + #endif diff --git a/src/modules/http/inc/restHandle.h b/src/plugins/http/inc/restHandle.h similarity index 100% rename from src/modules/http/inc/restHandle.h rename to src/plugins/http/inc/restHandle.h diff --git a/src/modules/http/inc/restJson.h b/src/plugins/http/inc/restJson.h similarity index 100% rename from src/modules/http/inc/restJson.h rename to src/plugins/http/inc/restJson.h diff --git a/src/modules/http/inc/tgHandle.h b/src/plugins/http/inc/tgHandle.h similarity index 100% rename from src/modules/http/inc/tgHandle.h rename to src/plugins/http/inc/tgHandle.h diff --git a/src/modules/http/inc/tgJson.h b/src/plugins/http/inc/tgJson.h similarity index 100% rename from src/modules/http/inc/tgJson.h rename to src/plugins/http/inc/tgJson.h diff --git a/src/modules/http/src/cJSON.c b/src/plugins/http/src/cJSON.c similarity index 100% rename from src/modules/http/src/cJSON.c rename to src/plugins/http/src/cJSON.c diff --git a/src/modules/http/src/gcHandle.c b/src/plugins/http/src/gcHandle.c similarity index 100% rename from src/modules/http/src/gcHandle.c rename to src/plugins/http/src/gcHandle.c diff --git a/src/modules/http/src/gcJson.c b/src/plugins/http/src/gcJson.c similarity index 100% rename from src/modules/http/src/gcJson.c rename to src/plugins/http/src/gcJson.c diff --git a/src/modules/http/src/httpAuth.c b/src/plugins/http/src/httpAuth.c similarity index 100% rename from src/modules/http/src/httpAuth.c rename to src/plugins/http/src/httpAuth.c diff --git a/src/modules/http/src/httpCode.c b/src/plugins/http/src/httpCode.c similarity index 100% rename from src/modules/http/src/httpCode.c rename to src/plugins/http/src/httpCode.c diff --git a/src/modules/http/src/httpHandle.c b/src/plugins/http/src/httpHandle.c similarity index 100% rename from src/modules/http/src/httpHandle.c rename to src/plugins/http/src/httpHandle.c diff --git a/src/modules/http/src/httpJson.c b/src/plugins/http/src/httpJson.c similarity index 100% rename from src/modules/http/src/httpJson.c rename to src/plugins/http/src/httpJson.c diff --git a/src/modules/http/src/httpResp.c b/src/plugins/http/src/httpResp.c similarity index 100% rename from src/modules/http/src/httpResp.c rename to src/plugins/http/src/httpResp.c diff --git a/src/modules/http/src/httpServer.c b/src/plugins/http/src/httpServer.c similarity index 100% rename from src/modules/http/src/httpServer.c rename to src/plugins/http/src/httpServer.c diff --git a/src/modules/http/src/httpSession.c b/src/plugins/http/src/httpSession.c similarity index 100% rename from src/modules/http/src/httpSession.c rename to src/plugins/http/src/httpSession.c diff --git a/src/modules/http/src/httpSql.c b/src/plugins/http/src/httpSql.c similarity index 100% rename from src/modules/http/src/httpSql.c rename to src/plugins/http/src/httpSql.c diff --git a/src/modules/http/src/httpSystem.c b/src/plugins/http/src/httpSystem.c similarity index 100% rename from src/modules/http/src/httpSystem.c rename to src/plugins/http/src/httpSystem.c diff --git a/src/modules/http/src/httpUtil.c b/src/plugins/http/src/httpUtil.c similarity index 100% rename from src/modules/http/src/httpUtil.c rename to src/plugins/http/src/httpUtil.c diff --git a/src/modules/http/src/restHandle.c b/src/plugins/http/src/restHandle.c similarity index 100% rename from src/modules/http/src/restHandle.c rename to src/plugins/http/src/restHandle.c diff --git a/src/modules/http/src/restJson.c b/src/plugins/http/src/restJson.c similarity index 100% rename from src/modules/http/src/restJson.c rename to src/plugins/http/src/restJson.c diff --git a/src/modules/http/src/tgHandle.c b/src/plugins/http/src/tgHandle.c similarity index 100% rename from src/modules/http/src/tgHandle.c rename to src/plugins/http/src/tgHandle.c diff --git a/src/modules/http/src/tgJson.c b/src/plugins/http/src/tgJson.c similarity index 100% rename from src/modules/http/src/tgJson.c rename to src/plugins/http/src/tgJson.c diff --git a/src/modules/monitor/CMakeLists.txt b/src/plugins/monitor/CMakeLists.txt similarity index 100% rename from src/modules/monitor/CMakeLists.txt rename to src/plugins/monitor/CMakeLists.txt diff --git a/src/dnode/inc/dnodeUtil.h b/src/plugins/monitor/inc/monitorSystem.h similarity index 73% rename from src/dnode/inc/dnodeUtil.h rename to src/plugins/monitor/inc/monitorSystem.h index dfb34b4b74a239ec74d0b3f128f0647051972449..6fd570947507136f6f4acc445c0e70c51d03f2d5 100644 --- a/src/dnode/inc/dnodeUtil.h +++ b/src/plugins/monitor/inc/monitorSystem.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_DNODE_UTIL_H -#define TDENGINE_DNODE_UTIL_H +#ifndef TDENGINE_MONITOR_SYSTEM_H +#define TDENGINE_MONITOR_SYSTEM_H #ifdef __cplusplus extern "C" { @@ -22,18 +22,14 @@ extern "C" { #include #include -#include "taosdef.h" -#include "taosmsg.h" -#include "tstatus.h" -EVnodeStatus dnodeGetVnodeStatus(int32_t vnode); - -bool dnodeCheckVnodeExist(int32_t vnode); - -void *dnodeGetVnodeObj(int32_t vnode); +int32_t monitorInitSystem(); +int32_t monitorStartSystem(); +void monitorStopSystem(); +void monitorCleanUpSystem(); #ifdef __cplusplus } #endif -#endif +#endif \ No newline at end of file diff --git a/src/modules/monitor/src/monitorSystem.c b/src/plugins/monitor/src/monitorSystem.c similarity index 100% rename from src/modules/monitor/src/monitorSystem.c rename to src/plugins/monitor/src/monitorSystem.c diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt index 70511db53369fc0e69ba870bd8c3c151e580cbe7..e8768c10dd074915e940599f609e5fb0491e4365 100644 --- a/src/rpc/CMakeLists.txt +++ b/src/rpc/CMakeLists.txt @@ -23,3 +23,6 @@ ENDIF () ADD_LIBRARY(trpc ${SRC}) TARGET_LINK_LIBRARIES(trpc tutil) + +ADD_SUBDIRECTORY(test) + diff --git a/src/rpc/inc/thaship.h b/src/rpc/inc/thaship.h index 4acf8b3fbbab14702b3f4e98dc40b0f68fa3a3e2..9d4396ce6a29ec0bd5ccb14d9b0e0713e951cceb 100644 --- a/src/rpc/inc/thaship.h +++ b/src/rpc/inc/thaship.h @@ -16,10 +16,18 @@ #ifndef _rpc_hash_ip_header_ #define _rpc_hash_ip_header_ +#ifdef __cplusplus +extern "C" { +#endif + void *taosOpenIpHash(int maxSessions); void taosCloseIpHash(void *handle); void *taosAddIpHash(void *handle, void *pData, uint32_t ip, uint16_t port); void taosDeleteIpHash(void *handle, uint32_t ip, uint16_t port); void *taosGetIpHash(void *handle, uint32_t ip, uint16_t port); +#ifdef __cplusplus +} +#endif + #endif diff --git a/src/rpc/inc/ttcpclient.h b/src/rpc/inc/ttcpclient.h index 1246b2560ef4e135bdc92e9eb7e34e6a2676d450..952d1c4a0ecb0d071bb7b96140d8c58e865924e9 100644 --- a/src/rpc/inc/ttcpclient.h +++ b/src/rpc/inc/ttcpclient.h @@ -16,6 +16,10 @@ #ifndef _taos_tcp_client_header_ #define _taos_tcp_client_header_ +#ifdef __cplusplus +extern "C" { +#endif + #include "taosdef.h" void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle); @@ -24,4 +28,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16 void taosCloseTcpClientConnection(void *chandle); int taosSendTcpClientData(uint32_t ip, uint16_t port, char *data, int len, void *chandle); +#ifdef __cplusplus +} +#endif + #endif diff --git a/src/rpc/inc/ttcpserver.h b/src/rpc/inc/ttcpserver.h index b62949e73e8ca38503eb1d2fb46e5cbdfd3042ef..299adb31694b14c77819d09fb5512c0c2b20e930 100644 --- a/src/rpc/inc/ttcpserver.h +++ b/src/rpc/inc/ttcpserver.h @@ -16,6 +16,10 @@ #ifndef _taos_tcp_server_header_ #define _taos_tcp_server_header_ +#ifdef __cplusplus +extern "C" { +#endif + #include "taosdef.h" void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); @@ -23,4 +27,8 @@ void taosCleanUpTcpServer(void *param); void taosCloseTcpServerConnection(void *param); int taosSendTcpServerData(uint32_t ip, uint16_t port, char *data, int len, void *chandle); +#ifdef __cplusplus +} +#endif + #endif diff --git a/src/rpc/inc/tudp.h b/src/rpc/inc/tudp.h index 647d54badeed8ea08336fff113d56168df662f9e..cb2f8d2b1084ea25d9ea737f2549a2b206cbcd39 100644 --- a/src/rpc/inc/tudp.h +++ b/src/rpc/inc/tudp.h @@ -16,6 +16,10 @@ #ifndef _taos_udp_header_ #define _taos_udp_header_ +#ifdef __cplusplus +extern "C" { +#endif + #include "taosdef.h" void *taosInitUdpServer(char *ip, uint16_t port, char *label, int, void *fp, void *shandle); @@ -30,4 +34,8 @@ void taosSendMsgHdr(void *hdr, int fd); void taosInitMsgHdr(void **hdr, void *dest, int maxPkts); void taosSetMsgHdrData(void *hdr, char *data, int dataLen); +#ifdef __cplusplus +} +#endif + #endif diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c index bb2c466324c73871deb4685a06c957d95ae35e58..e3db06d952972e4e4069e7feade2afec3ae8ef8c 100755 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -30,6 +30,7 @@ #include "lz4.h" #include "tconncache.h" #include "trpc.h" +#include "taoserror.h" #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHeader) + sizeof(SRpcDigest)) #define rpcHeaderFromCont(cont) ((SRpcHeader *) (cont - sizeof(SRpcHeader))) diff --git a/src/rpc/src/tstring.c b/src/rpc/src/tstring.c index 366d3443f6bedadf2c8251e2c3f8f017bdb78c90..536e8489ac9eac61be01689b58d27a482df65100 100644 --- a/src/rpc/src/tstring.c +++ b/src/rpc/src/tstring.c @@ -26,30 +26,8 @@ char *taosMsg[] = { "create-table", "create-table-rsp", //10 - "create-normal-table", - "create-normal-table-rsp", - "create-stream-table", - "create-stream-table-rsp", - "create-super-table", - "create-super-table-rsp", "remove-table", "remove-table-rsp", - "remove-normal-table", - "remove-normal-table-rsp", //20 - - "remove-stream-table", - "remove-stream-table-rsp", - "remove-super-table", - "remove-super-table-rsp", - "alter-table", - "alter-table-rsp", - "alter-normal-table", - "alter-normal-table-rsp", - "alter-stream-table", - "alter-stream-table-rsp", //30 - - "alter-super-table", - "alter-super-table-rsp", "vpeers", "vpeers-rsp", "free-vnode", @@ -57,7 +35,7 @@ char *taosMsg[] = { "cfg-dnode", "cfg-dnode-rsp", "alter-stream", - "alter-stream-rsp", //40 + "alter-stream-rsp", //20 "sync", "sync-rsp", @@ -68,7 +46,7 @@ char *taosMsg[] = { "", "", "", - "", //50 + "", //30 "connect", "connect-rsp", @@ -79,7 +57,7 @@ char *taosMsg[] = { "drop-acct", "drop-acct-rsp", "create-user", - "create-user-rsp", //60 + "create-user-rsp", //40 "alter-user", "alter-user-rsp", @@ -90,7 +68,7 @@ char *taosMsg[] = { "drop-mnode", "drop-mnode-rsp", "create-dnode", - "create-dnode-rsp", //70 + "create-dnode-rsp", //50 "drop-dnode", "drop-dnode-rsp", @@ -101,7 +79,7 @@ char *taosMsg[] = { "drop-db", "drop-db-rsp", "use-db", - "use-db-rsp", //80 + "use-db-rsp", //60 "alter-db", "alter-db-rsp", @@ -112,7 +90,7 @@ char *taosMsg[] = { "alter-table", "alter-table-rsp", "cfg-vnode", - "cfg-vnode-rsp", //90 + "cfg-vnode-rsp", //70 "cfg-table", "cfg-table-rsp", @@ -123,7 +101,7 @@ char *taosMsg[] = { "multi-table-meta", "multi-table-meta-rsp", "alter-stream", - "alter-stream-rsp", //100 + "alter-stream-rsp", //80 "show", "show-rsp", @@ -134,7 +112,7 @@ char *taosMsg[] = { "kill-stream", "kill-stream-rsp", "kill-connection", - "kill-connectoin-rsp", //110 + "kill-connectoin-rsp", //90 "heart-beat", "heart-beat-rsp", diff --git a/src/rpc/test/CMakeLists.txt b/src/rpc/test/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..f1e7a1cd390b08893b8323aab08c47c7e7e9c305 --- /dev/null +++ b/src/rpc/test/CMakeLists.txt @@ -0,0 +1,15 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) + INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) + INCLUDE_DIRECTORIES(../inc) + AUX_SOURCE_DIRECTORY(./ TEST_SRC) + + ADD_EXECUTABLE(rpcTest ${TEST_SRC}) + TARGET_LINK_LIBRARIES(rpcTest trpc) +ENDIF () + + diff --git a/src/rpc/test/unittest.c b/src/rpc/test/unittest.c new file mode 100644 index 0000000000000000000000000000000000000000..262c93a62b95432ab81b68fa76d184446ea4cc8c --- /dev/null +++ b/src/rpc/test/unittest.c @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//#define _DEFAULT_SOURCE +#include "os.h" +#include "tlog.h" +#include "trpc.h" +#include + +int32_t main(int32_t argc, char *argv[]) { + dPrint("unit test for rpc module"); + + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localIp = "0.0.0.0"; + rpcInit.localPort = 7000; + rpcInit.label = "unittest"; + rpcInit.numOfThreads = 1; + rpcInit.fp = NULL; + rpcInit.sessions = 1000; + rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S(); + rpcInit.idleTime = 2000; + + void *pConn = rpcOpen(&rpcInit); + if (pConn != NULL) { + dPrint("conection is opened"); + } else { + dError("failed to initialize rpc"); + } + + return 0; +} + + diff --git a/src/util/inc/tlog.h b/src/util/inc/tlog.h index 7556cc50a1ff23d24ef8eafdcd677f65577c8713..8bd4333f16c619b83e9aaba7cd52fa029213bb22 100644 --- a/src/util/inc/tlog.h +++ b/src/util/inc/tlog.h @@ -197,6 +197,48 @@ extern uint32_t cdebugFlag; #define mLWarn(...) taosLogWarn(__VA_ARGS__) mWarn(__VA_ARGS__) #define mLPrint(...) taosLogPrint(__VA_ARGS__) mPrint(__VA_ARGS__) +#define httpError(...) \ + if (httpDebugFlag & DEBUG_ERROR) { \ + tprintf("ERROR HTP ", 255, __VA_ARGS__); \ + } +#define httpWarn(...) \ + if (httpDebugFlag & DEBUG_WARN) { \ + tprintf("WARN HTP ", httpDebugFlag, __VA_ARGS__); \ + } +#define httpTrace(...) \ + if (httpDebugFlag & DEBUG_TRACE) { \ + tprintf("HTP ", httpDebugFlag, __VA_ARGS__); \ + } +#define httpDump(...) \ + if (httpDebugFlag & DEBUG_TRACE) { \ + taosPrintLongString("HTP ", httpDebugFlag, __VA_ARGS__); \ + } +#define httpPrint(...) \ + { tprintf("HTP ", 255, __VA_ARGS__); } + +#define httpLError(...) taosLogError(__VA_ARGS__) httpError(__VA_ARGS__) +#define httpLWarn(...) taosLogWarn(__VA_ARGS__) httpWarn(__VA_ARGS__) +#define httpLPrint(...) taosLogPrint(__VA_ARGS__) httpPrint(__VA_ARGS__) + +#define monitorError(...) \ + if (monitorDebugFlag & DEBUG_ERROR) { \ + tprintf("ERROR MON ", 255, __VA_ARGS__); \ + } +#define monitorWarn(...) \ + if (monitorDebugFlag & DEBUG_WARN) { \ + tprintf("WARN MON ", monitorDebugFlag, __VA_ARGS__); \ + } +#define monitorTrace(...) \ + if (monitorDebugFlag & DEBUG_TRACE) { \ + tprintf("MON ", monitorDebugFlag, __VA_ARGS__); \ + } +#define monitorPrint(...) \ + { tprintf("MON ", 255, __VA_ARGS__); } + +#define monitorLError(...) taosLogError(__VA_ARGS__) monitorError(__VA_ARGS__) +#define monitorLWarn(...) taosLogWarn(__VA_ARGS__) monitorWarn(__VA_ARGS__) +#define monitorLPrint(...) taosLogPrint(__VA_ARGS__) monitorPrint(__VA_ARGS__) + #ifdef __cplusplus } #endif