diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 498dcddd049acb69bef33910dd7d0b0972fd08fa..1fe4ba2979a507b7044c581450db1cb0fbdb01ed 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -545,7 +545,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { * There is not response callback function for submit response. * The actual inserted number of points is the first number. */ - if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP) { + if (pMsg->msgType == TSDB_MSG_TYPE_DNODE_SUBMIT_RSP) { pRes->numOfRows += *(int32_t *)pRes->pRsp; tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code, @@ -1464,7 +1464,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += sizeof(pQueryInfo->type); pSql->cmd.payloadLen = pMsg - pStart; - pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE; + pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_RETRIEVE; return TSDB_CODE_SUCCESS; } @@ -1503,7 +1503,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here - pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; + pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_SUBMIT; tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip), htons(pShellMsg->vnode)); @@ -1900,7 +1900,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tscTrace("%p msg built success,len:%d bytes", pSql, msgLen); pCmd->payloadLen = msgLen; - pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY; + pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_QUERY; assert(msgLen + minMsgSize() <= size); @@ -2041,7 +2041,7 @@ int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += sizeof(SCfgMsg); pCmd->payloadLen = pMsg - pStart; - pCmd->msgType = TSDB_MSG_TYPE_CFG_PNODE; + pCmd->msgType = TSDB_MSG_TYPE_DNODE_CFG; return TSDB_CODE_SUCCESS; } @@ -2480,7 +2480,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) { msgLen = pMsg - pStart; pCmd->payloadLen = msgLen; - pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE; + pCmd->msgType = TSDB_MSG_TYPE_DNODE_RETRIEVE; return TSDB_CODE_SUCCESS; } @@ -2660,7 +2660,7 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { msgLen = pMsg - pStart; pCmd->payloadLen = msgLen; - pCmd->msgType = TSDB_MSG_TYPE_METERINFO; + pCmd->msgType = TSDB_MSG_TYPE_TABLE_META; tfree(tmpData); @@ -2698,7 +2698,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tfree(tmpData); pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiMeterInfoMsg); - pCmd->msgType = TSDB_MSG_TYPE_MULTI_METERINFO; + pCmd->msgType = TSDB_MSG_TYPE_MULTI_TABLE_META; assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize); @@ -2866,7 +2866,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { msgLen = pMsg - pStart; pCmd->payloadLen = msgLen; - pCmd->msgType = TSDB_MSG_TYPE_METRIC_META; + pCmd->msgType = TSDB_MSG_TYPE_STABLE_META; assert(msgLen + minMsgSize() <= size); return TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 47fd4da2a44f16746d1a98115ebbf587e0499339..aaf459a2d6a791edfeef21833f9456a37d7a9eed 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -118,7 +118,7 @@ bool tscQueryOnMetric(SSqlCmd* pCmd) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); return ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_QUERY) == TSDB_QUERY_TYPE_STABLE_QUERY) && - (pCmd->msgType == TSDB_MSG_TYPE_QUERY); + (pCmd->msgType == TSDB_MSG_TYPE_DNODE_QUERY); } bool tscQueryMetricTags(SQueryInfo* pQueryInfo) { diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index 53cef1412f346c44bcc00ef45e62b1e7424af22f..30bc8a2ad35a87085419fd64f076522bd8ce6674 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -25,10 +25,10 @@ extern "C" { #include "tsched.h" #include "dnode.h" -int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj); -int vnodeProcessRemoveMeterRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj); +int dnodeProcessCreateTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj); +int dnodeProcessRemoveTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj); -void dnodeDistributeMsgFromMgmt(char *content, int msgLen, int msgType, SMgmtObj *pObj); +void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn); extern void *dmQhandle; diff --git a/src/dnode/inc/dnodeRead.h b/src/dnode/inc/dnodeRead.h index 9e8183ca7190f5c466100af85d2fd024868ff86a..ff7a8816e02cd6c706345c40a15ab057a3ff8ece 100644 --- a/src/dnode/inc/dnodeRead.h +++ b/src/dnode/inc/dnodeRead.h @@ -24,19 +24,37 @@ extern "C" { #include #include "taosdef.h" #include "taosmsg.h" -#include "dnodeShell.h" -void dnodeFreeQInfoInQueue(SShellObj *pShellObj); +/* + * Clear query information associated with this connection + */ +void dnodeFreeQInfo(void *pConn); + +/* + * Clear all query informations + */ +void dnodeFreeQInfos(); + +/* + * handle query message, and the result is returned by callback function + */ +void dnodeQueryData(SQueryMeterMsg *pQuery, void *pConn, void (*callback)(int32_t code, void *pQInfo, void *pConn)); /* - * Dnode handle read messages - * The processing result is returned by callback function with pShellObj parameter + * Dispose retrieve msg, and the result will passed through callback function */ -int32_t dnodeReadData(SQueryMeterMsg *msg, void *pShellObj, void (*callback)(SQueryMeterRsp *rspMsg, void *pShellObj)); +typedef void (*SDnodeRetrieveCallbackFp)(int32_t code, void *pQInfo, void *pConn); +void dnodeRetrieveData(SRetrieveMeterMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp); -typedef void (*SDnodeRetrieveCallbackFp)(int32_t code, SRetrieveMeterRsp *pRetrieveRspMsg, void *pShellObj); +/* + * Fill retrieve result according to query info + */ +int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveMeterRsp *retrievalRsp); -void dnodeRetrieveData(SRetrieveMeterMsg *pMsg, int32_t msgLen, void *pShellObj, SDnodeRetrieveCallbackFp callback); +/* + * Get the size of retrieve result according to query info + */ +int32_t dnodeGetRetrieveDataSize(void *pQInfo); #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeWrite.h b/src/dnode/inc/dnodeWrite.h index d10ff8570c5b39eea1618c3958b997ad5148339c..6f74dee879b441f657792b789df46124eaedc7c0 100644 --- a/src/dnode/inc/dnodeWrite.h +++ b/src/dnode/inc/dnodeWrite.h @@ -27,17 +27,12 @@ extern "C" { /* * Write data based on dnode, the detail result can be fetched from rsponse - * pSubmitMsg: Data to be written - * pShellObj: Used to pass a communication handle - * callback: Pass the write result through a callback function, possibly in a different thread space - * rsp: will not be freed by callback function + * pSubmit: Data to be written + * pConn: Communication handle + * callback: Pass the write result through a callback function, possibly in a different thread space + * rsp: will not be freed by callback function */ -void dnodeWriteData(SShellSubmitMsg *pMsg, void *pShellObj, void (*callback)(SShellSubmitRspMsg *rsp, void *pShellObj)); - -/* - * Check if table already exists - */ -int32_t dnodeCheckTableExist(char *tableId); +void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)); /* * Create noraml table with specified configuration and open it diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 6d7dc1d0326b61e2764fbd150d33491a6f874f43..72728f90addcedbee0b4f525a72d51b79f15a521 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -22,28 +22,28 @@ #include "dnodeMgmt.h" #include "taosmsg.h" +#include "tlog.h" #include "trpc.h" #include "tsched.h" #include "tsystem.h" -#include "vnode.h" -#include "vnodeSystem.h" -#include "vnodeUtil.h" -#include "vnodeStatus.h" SMgmtObj mgmtObj; extern uint64_t tsCreatedTime; -int vnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj); +int dnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj); int vnodeProcessCreateMeterMsg(char *pMsg, int msgLen); -int vnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj); -int vnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj); -int vnodeProcessMeterCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj); -int vnodeProcessCfgDnodeRequest(char *cont, int contLen, SMgmtObj *pMgmtObj); -int vnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj); +int dnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj); +int dnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj); +int dnodeProcessTableCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj); +int dnodeProcessDnodeCfgRequest(char *cont, int contLen, SMgmtObj *pMgmtObj); +int dnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj); void vnodeUpdateHeadFile(int vnode, int oldTables, int newTables); void vnodeOpenVnode(int vnode); void vnodeCleanUpOneVnode(int vnode); +static int (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, void *pConn); +static void dnodeInitProcessShellMsg(); + char *taosBuildRspMsgToMnodeWithSizeImp(SMgmtObj *pObj, char type, int size) { char *pStart = (char *)malloc(size); if (pStart == NULL) { @@ -106,48 +106,40 @@ int taosSendSimpleRspToMnodeImp(SMgmtObj *pObj, char rsptype, char code) { } int (*taosSendSimpleRspToMnode)(SMgmtObj *pObj, char rsptype, char code) = taosSendSimpleRspToMnodeImp; -int32_t dnodeInitMgmtImp() { return 0; } +int32_t dnodeInitMgmtImp() { + dnodeInitProcessShellMsg(); + return 0; +} + int32_t (*dnodeInitMgmt)() = dnodeInitMgmtImp; void dnodeInitMgmtIpImp() {} + void (*dnodeInitMgmtIp)() = dnodeInitMgmtIpImp; void dnodeProcessMsgFromMgmtImp(SSchedMsg *sched) { - char msgType = *sched->msg; - char *content = sched->msg + 1; + int32_t msgType = *(int32_t*)(sched->msg); + int8_t *content = sched->msg + sizeof(int32_t); - dTrace("msg:%s is received from mgmt", taosMsg[(uint8_t)msgType]); - - dnodeDistributeMsgFromMgmt(content, 0, msgType, 0); + dTrace("msg:%s is received from mgmt", taosMsg[msgType]); + dnodeDistributeMsgFromMgmt(content, 0, msgType, NULL); free(sched->msg); } -void dnodeDistributeMsgFromMgmt(char *content, int msgLen, int msgType, SMgmtObj *pObj) { - if (msgType == TSDB_MSG_TYPE_CREATE) { - vnodeProcessCreateMeterRequest(content, msgLen, pObj); - } else if (msgType == TSDB_MSG_TYPE_VPEERS) { - vnodeProcessVPeersMsg(content, msgLen, pObj); - } else if (msgType == TSDB_MSG_TYPE_VPEER_CFG_RSP) { - vnodeProcessVPeerCfgRsp(content, msgLen, pObj); - } else if (msgType == TSDB_MSG_TYPE_METER_CFG_RSP) { - vnodeProcessMeterCfgRsp(content, msgLen, pObj); - } else if (msgType == TSDB_MSG_TYPE_REMOVE) { - vnodeProcessRemoveMeterRequest(content, msgLen, pObj); - } else if (msgType == TSDB_MSG_TYPE_FREE_VNODE) { - vnodeProcessFreeVnodeRequest(content, msgLen, pObj); - } else if (msgType == TSDB_MSG_TYPE_CFG_PNODE) { - vnodeProcessCfgDnodeRequest(content, msgLen, pObj); - } else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM) { - vnodeProcessAlterStreamRequest(content, msgLen, pObj); - } else if (msgType == TSDB_MSG_TYPE_GRANT_RSP) { - // do nothing +void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn) { + if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { + dError("invalid msg type:%d", msgType); } else { - dError("%s is not processed", taosMsg[msgType]); + if (dnodeProcessShellMsgFp[msgType]) { + (*dnodeProcessShellMsgFp[msgType])(pConn, contLen, pConn); + } else { + dError("%s is not processed", taosMsg[msgType]); + } } } -int vnodeProcessMeterCfgRsp(char *pMsg, int msgLen, SMgmtObj *pObj) { +int dnodeProcessTableCfgRsp(char *pMsg, int msgLen, SMgmtObj *pObj) { int code = *pMsg; if (code == 0) { @@ -166,7 +158,7 @@ int vnodeProcessMeterCfgRsp(char *pMsg, int msgLen, SMgmtObj *pObj) { return 0; } -int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pObj) { +int dnodeProcessCreateTableRequest(char *pMsg, int msgLen, SMgmtObj *pObj) { SCreateMsg *pCreate; int code = 0; int vid; @@ -196,12 +188,12 @@ int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pObj) { // } _over: - taosSendSimpleRspToMnode(pObj, TSDB_MSG_TYPE_CREATE_RSP, code); + taosSendSimpleRspToMnode(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE_RSP, code); return code; } -int vnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj) { +int dnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj) { SAlterStreamMsg *pAlter; int code = 0; int vid, sid; @@ -356,7 +348,7 @@ _create_over: return code; } -int vnodeProcessRemoveMeterRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) { +int dnodeProcessRemoveTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) { SMeterObj * pObj; SRemoveMeterMsg *pRemove; int code = 0; @@ -476,7 +468,7 @@ int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) { return 0; } -int vnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj) { +int dnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj) { STaosRsp *pRsp; pRsp = (STaosRsp *)msg; @@ -497,7 +489,7 @@ int vnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj) { return 0; } -int vnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj) { +int dnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj) { int code = 0; code = vnodeProcessVPeerCfg(msg, msgLen, pMgmtObj); @@ -506,7 +498,7 @@ int vnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj) { STaosRsp * pRsp; SVPeersMsg *pVPeersMsg = (SVPeersMsg *)msg; - pStart = taosBuildRspMsgToMnode(pMgmtObj, TSDB_MSG_TYPE_VPEERS_RSP); + pStart = taosBuildRspMsgToMnode(pMgmtObj, TSDB_MSG_TYPE_DNODE_VPEERS_RSP); if (pStart == NULL) return -1; pRsp = (STaosRsp *)pStart; @@ -519,7 +511,7 @@ int vnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj) { return code; } -int vnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) { +int dnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) { SFreeVnodeMsg *pFree; pFree = (SFreeVnodeMsg *)pMsg; @@ -534,16 +526,16 @@ int vnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) { int32_t code = vnodeRemoveVnode(pFree->vnode); assert(code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS); - taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_FREE_VNODE_RSP, code); + taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP, code); return 0; } -int vnodeProcessCfgDnodeRequest(char *cont, int contLen, SMgmtObj *pMgmtObj) { +int dnodeProcessDnodeCfgRequest(char *cont, int contLen, SMgmtObj *pMgmtObj) { SCfgMsg *pCfg = (SCfgMsg *)cont; int code = tsCfgDynamicOptions(pCfg->config); - taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_CFG_PNODE_RSP, code); + taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_DNODE_CFG_RSP, code); return 0; } @@ -554,7 +546,7 @@ void dnodeSendVpeerCfgMsg(int32_t vnode) { SVpeerCfgMsg *pCfg; SMgmtObj * pObj = &mgmtObj; - pStart = taosBuildReqMsgToMnode(pObj, TSDB_MSG_TYPE_VPEER_CFG); + pStart = taosBuildReqMsgToMnode(pObj, TSDB_MSG_TYPE_VNODE_CFG); if (pStart == NULL) return; pMsg = pStart; @@ -572,7 +564,7 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { SMeterCfgMsg *pCfg; SMgmtObj * pObj = &mgmtObj; - pStart = taosBuildReqMsgToMnode(pObj, TSDB_MSG_TYPE_METER_CFG); + pStart = taosBuildReqMsgToMnode(pObj, TSDB_MSG_TYPE_TABLE_CFG); if (pStart == NULL) return -1; pMsg = pStart; @@ -585,3 +577,18 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { return taosSendMsgToMnode(pObj, pStart, msgLen); } + +void dnodeInitProcessShellMsg() { + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE] = dnodeProcessCreateTableRequest; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_NORMAL_TABLE] = dnodeProcessCreateTableRequest; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_STREAM_TABLE] = dnodeProcessCreateTableRequest; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE] = dnodeProcessRemoveTableRequest; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_NORMAL_TABLE] = dnodeProcessRemoveTableRequest; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_STREAM_TABLE] = dnodeProcessRemoveTableRequest; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_VPEERS] = dnodeProcessVPeersMsg; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_FREE_VNODE] = dnodeProcessFreeVnodeRequest; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_TABLE_CFG_RSP] = dnodeProcessTableCfgRsp; +} \ No newline at end of file diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 6ed150994a301f9dbaad429302be4035cc44ccc3..5173d8624d828247f9171265f22308f341d8cae4 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -17,45 +17,51 @@ #include "os.h" #include "taoserror.h" #include "tlog.h" -#include "dnodeWrite.h" #include "dnode.h" #include "dnodeRead.h" #include "dnodeSystem.h" -void dnodeFreeQInfoInQueue(SShellObj *pShellObj) { -} +void dnodeFreeQInfo(void *pConn) {} + +void dnodeFreeQInfos() {} +void dnodeQueryData(SQueryMeterMsg *pQuery, void *pConn, void (*callback)(int32_t code, void *pQInfo, void *pConn)) { + void *pQInfo = NULL; + int code = TSDB_CODE_SUCCESS; + callback(code, pConn, pQInfo); +} -void dnodeExecuteRetrieveData(SSchedMsg *pSched) { +static void dnodeExecuteRetrieveData(SSchedMsg *pSched) { SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *)pSched->msg; SDnodeRetrieveCallbackFp callback = (SDnodeRetrieveCallbackFp)pSched->thandle; - SShellObj *pObj = (SShellObj *)pSched->ahandle; - SRetrieveMeterRsp result = {0}; - - /* - * in case of server restart, apps may hold qhandle created by server before restart, - * which is actually invalid, therefore, signature check is required. - */ - if (pRetrieve->qhandle != (uint64_t)pObj->qhandle) { - // if free flag is set, client wants to clean the resources - dError("QInfo:%p, qhandle:%p is not matched with saved:%p", pObj->qhandle, pRetrieve->qhandle, pObj->qhandle); - int32_t code = TSDB_CODE_INVALID_QHANDLE; - (*callback)(code, &result, pObj); - } + void *pConn = pSched->ahandle; + + //examples + int32_t code = TSDB_CODE_INVALID_QHANDLE; + void *pQInfo = NULL; //get from pConn + (*callback)(code, NULL, pConn); //TODO build response here free(pSched->msg); } -void dnodeRetrieveData(SRetrieveMeterMsg *pMsg, int32_t msgLen, void *pShellObj, SDnodeRetrieveCallbackFp callback) { - int8_t *msg = malloc(msgLen); - memcpy(msg, pMsg, msgLen); +void dnodeRetrieveData(SRetrieveMeterMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp) { + int8_t *msg = malloc(sizeof(pRetrieve)); + memcpy(msg, pRetrieve, sizeof(pRetrieve)); SSchedMsg schedMsg; schedMsg.msg = msg; - schedMsg.ahandle = pShellObj; - schedMsg.thandle = callback; + schedMsg.ahandle = pConn; + schedMsg.thandle = callbackFp; schedMsg.fp = dnodeExecuteRetrieveData; taosScheduleTask(tsQueryQhandle, &schedMsg); } + +int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveMeterRsp *retrievalRsp) { + return 0; +} + +int32_t dnodeGetRetrieveDataSize(void *pQInfo) {} + + diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 000b51f244769adcf31ac7b227e837a4a86626ba..c873f11eaa4694eb351c0235556518381fbd922e 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -32,334 +32,155 @@ #include "dnodeUtil.h" #include "dnodeWrite.h" -static void dnodeProcessRetrieveRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj); -static void dnodeProcessQueryRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj); -static void dnodeProcessShellSubmitRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj); - -static void *tsDnodeShellServer = NULL; -static SShellObj *tsDnodeShellList = NULL; -static int32_t tsDnodeSelectReqNum = 0; -static int32_t tsDnodeInsertReqNum = 0; -static int32_t tsDnodeShellConns = 0; - -#define NUM_OF_SESSIONS_PER_VNODE (300) -#define NUM_OF_SESSIONS_PER_DNODE (NUM_OF_SESSIONS_PER_VNODE * TSDB_MAX_VNODES) - -void *dnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { - int sid, vnode; - SShellObj *pObj = (SShellObj *)ahandle; - SIntMsg * pMsg = (SIntMsg *)msg; - uint32_t peerId, peerIp; - uint16_t peerPort; - char ipstr[20]; - - if (msg == NULL) { - if (pObj) { - pObj->thandle = NULL; - dTrace("QInfo:%p %s free qhandle", pObj->qhandle, __FUNCTION__); - dnodeFreeQInfoInQueue(pObj); - pObj->qhandle = NULL; - tsDnodeShellConns--; - dTrace("shell connection:%d is gone, shellConns:%d", pObj->sid, tsDnodeShellConns); - } - return NULL; - } +static void dnodeProcessRetrieveRequest(int8_t *pCont, int32_t contLen, void *pConn); +static void dnodeProcessQueryRequest(int8_t *pCont, int32_t contLen, void *pConn); +static void dnodeProcessShellSubmitRequest(int8_t *pCont, int32_t contLen, void *pConn); - taosGetRpcConnInfo(thandle, &peerId, &peerIp, &peerPort, &vnode, &sid); +static void *tsDnodeShellServer = NULL; +static int32_t tsDnodeQueryReqNum = 0; +static int32_t tsDnodeSubmitReqNum = 0; - if (pObj == NULL) { - pObj = tsDnodeShellList + sid; - pObj->thandle = thandle; - pObj->sid = sid; - pObj->ip = peerIp; - tinet_ntoa(ipstr, peerIp); - tsDnodeShellConns--; - dTrace("shell connection:%d from ip:%s is created, shellConns:%d", sid, ipstr, tsDnodeShellConns); - } else { - if (pObj != tsDnodeShellList + sid) { - dError("shell connection:%d, pObj:%p is not matched with:%p", sid, pObj, tsDnodeShellList + sid); - return NULL; - } - } +void dnodeProcessMsgFromShell(int32_t msgType, void *pCont, int32_t contLen, void *handle, int32_t index) { + assert(handle != NULL); - dTrace("vid:%d sid:%d, msg:%s is received pConn:%p", vnode, sid, taosMsg[pMsg->msgType], thandle); + if (pCont == NULL || contLen == 0) { + dnodeFreeQInfo(handle); + dTrace("conn:%p, free query info", handle); + return; + } if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { - taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY); - dTrace("sid:%d, shell query msg is ignored since dnode not running", sid); - return pObj; + rpcSendSimpleRsp(handle, TSDB_CODE_NOT_READY); + dTrace("conn:%p, query msg is ignored since dnode not running", handle); + return; } - if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) { - dnodeProcessQueryRequest(pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); - } else if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) { - dnodeProcessRetrieveRequest(pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); - } else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { - dnodeProcessShellSubmitRequest(pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); + dTrace("conn:%p, msg:%s is received", handle, taosMsg[msgType]); + + if (msgType == TSDB_MSG_TYPE_DNODE_QUERY) { + dnodeProcessQueryRequest(pCont, contLen, handle); + } else if (msgType == TSDB_MSG_TYPE_DNODE_RETRIEVE) { + dnodeProcessRetrieveRequest(pCont, contLen, handle); + } else if (msgType == TSDB_MSG_TYPE_DNODE_SUBMIT) { + dnodeProcessShellSubmitRequest(pCont, contLen, handle); } else { - dError("%s is not processed", taosMsg[pMsg->msgType]); + dError("conn:%p, msg:%s is not processed", handle, taosMsg[msgType]); } - - return pObj; } int32_t dnodeInitShell() { - SRpcInit rpcInit; - - int numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; - numOfThreads = (1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0; + int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; + numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0); if (numOfThreads < 1) { numOfThreads = 1; } + SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); - - rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp; - - rpcInit.localPort = tsVnodeShellPort; - rpcInit.label = "DND-shell"; + rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp; + rpcInit.localPort = tsVnodeShellPort; + rpcInit.label = "DND-shell"; rpcInit.numOfThreads = numOfThreads; - rpcInit.fp = dnodeProcessMsgFromShell; - rpcInit.bits = TSDB_SHELL_VNODE_BITS; - rpcInit.numOfChanns = TSDB_MAX_VNODES; - rpcInit.sessionsPerChann = 16; - rpcInit.idMgmt = TAOS_ID_FREE; - rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S(); - rpcInit.idleTime = tsShellActivityTimer * 2000; - rpcInit.qhandle = tsRpcQhandle[0]; - //rpcInit.efp = vnodeSendVpeerCfgMsg; - - tsDnodeShellServer = taosOpenRpc(&rpcInit); - if (tsDnodeShellServer == NULL) { - dError("failed to init connection to shell"); - return -1; - } + rpcInit.fp = dnodeProcessMsgFromShell; + rpcInit.sessions = TSDB_SESSIONS_PER_DNODE; + rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S(); + rpcInit.idleTime = tsShellActivityTimer * 2000; - - const int32_t size = NUM_OF_SESSIONS_PER_DNODE * sizeof(SShellObj); - tsDnodeShellList = (SShellObj *)malloc(size); - if (tsDnodeShellList == NULL) { - dError("failed to allocate shellObj, sessions:%d", NUM_OF_SESSIONS_PER_DNODE); - return -1; - } - memset(tsDnodeShellList, 0, size); - - // TODO re initialize tsRpcQhandle - if(taosOpenRpcChannWithQ(tsDnodeShellServer, 0, NUM_OF_SESSIONS_PER_DNODE, tsRpcQhandle) != TSDB_CODE_SUCCESS) { - dError("sessions:%d, failed to open shell", NUM_OF_SESSIONS_PER_DNODE); + tsDnodeShellServer = rpcOpen(&rpcInit); + if (tsDnodeShellServer == NULL) { + dError("failed to init connection from shell"); return -1; } - dError("sessions:%d, shell is opened", NUM_OF_SESSIONS_PER_DNODE); + dPrint("shell is opened"); return TSDB_CODE_SUCCESS; } void dnodeCleanupShell() { if (tsDnodeShellServer) { - taosCloseRpc(tsDnodeShellServer); + rpcClose(tsDnodeShellServer); } - for (int i = 0; i < NUM_OF_SESSIONS_PER_DNODE; ++i) { - dnodeFreeQInfoInQueue(tsDnodeShellList+i); - } - - //tfree(tsDnodeShellList); + dnodeFreeQInfos(); } -int vnodeSendQueryRspMsg(SShellObj *pObj, int code, void *qhandle) { - char *pMsg, *pStart; - int msgLen; - - pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_QUERY_RSP, 128); - if (pStart == NULL) return -1; - pMsg = pStart; - - *pMsg = code; - pMsg++; +void dnodeProcessQueryRequestCb(int code, void *pQInfo, void *pConn) { + int32_t contLen = sizeof(SQueryMeterRsp); + SQueryMeterRsp *queryRsp = (SQueryMeterRsp *) rpcMallocCont(contLen); + if (queryRsp == NULL) { + return; + } - *((uint64_t *)pMsg) = (uint64_t)qhandle; - pMsg += 8; + dTrace("conn:%p, query data, code:%d pQInfo:%p", pConn, code, pQInfo); - msgLen = pMsg - pStart; - taosSendMsgToPeer(pObj->thandle, pStart, msgLen); + queryRsp->code = htonl(code); + queryRsp->qhandle = (uint64_t) (pQInfo); - return msgLen; + rpcSendResponse(pConn, queryRsp, contLen); } -int32_t dnodeSendShellSubmitRspMsg(SShellObj *pObj, int32_t code, int32_t numOfPoints) { - char *pMsg, *pStart; - int msgLen; - - dTrace("code:%d numOfTotalPoints:%d", code, numOfPoints); - pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_SUBMIT_RSP, 128); - if (pStart == NULL) return -1; - pMsg = pStart; - - *pMsg = code; - pMsg++; +static void dnodeProcessQueryRequest(int8_t *pCont, int32_t contLen, void *pConn) { + atomic_fetch_add_32(&tsDnodeQueryReqNum, 1); + dTrace("conn:%p, start to query data", pConn); - *(int32_t *)pMsg = numOfPoints; - pMsg += sizeof(numOfPoints); - - msgLen = pMsg - pStart; - taosSendMsgToPeer(pObj->thandle, pStart, msgLen); - - return msgLen; + SQueryMeterMsg *pQuery = (SQueryMeterMsg *) pCont; + dnodeQueryData(pQuery, pConn, dnodeProcessQueryRequestCb); } -int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) { - int ret, code = 0; - SQueryMeterMsg * pQueryMsg; - SMeterSidExtInfo **pSids = NULL; - int32_t incNumber = 0; - SSqlFunctionExpr * pExprs = NULL; - SSqlGroupbyExpr * pGroupbyExpr = NULL; - SMeterObj ** pMeterObjList = NULL; - - pQueryMsg = (SQueryMeterMsg *)pMsg; - if ((code = vnodeConvertQueryMeterMsg(pQueryMsg)) != TSDB_CODE_SUCCESS) { - goto _query_over; - } +void dnodeProcessRetrieveRequestCb(int32_t code, void *pQInfo, void *pConn) { + dTrace("conn:%p, retrieve data, code:%d", pConn, code); - if (pQueryMsg->numOfSids <= 0) { - dError("Invalid number of meters to query, numOfSids:%d", pQueryMsg->numOfSids); - code = TSDB_CODE_INVALID_QUERY_MSG; - goto _query_over; - } - - if (pQueryMsg->vnode >= TSDB_MAX_VNODES || pQueryMsg->vnode < 0) { - dError("qmsg:%p,vid:%d is out of range", pQueryMsg, pQueryMsg->vnode); - code = TSDB_CODE_INVALID_TABLE_ID; - goto _query_over; - } - - SVnodeObj *pVnode = &vnodeList[pQueryMsg->vnode]; - - if (pVnode->cfg.maxSessions == 0) { - dError("qmsg:%p,vid:%d is not activated yet", pQueryMsg, pQueryMsg->vnode); - vnodeSendVpeerCfgMsg(pQueryMsg->vnode); - code = TSDB_CODE_NOT_ACTIVE_VNODE; - goto _query_over; - } - - if (!(pVnode->accessState & TSDB_VN_READ_ACCCESS)) { - dError("qmsg:%p,vid:%d access not allowed", pQueryMsg, pQueryMsg->vnode); - code = TSDB_CODE_NO_READ_ACCESS; - goto _query_over; - } - - if (pVnode->meterList == NULL) { - dError("qmsg:%p,vid:%d has been closed", pQueryMsg, pQueryMsg->vnode); - code = TSDB_CODE_NOT_ACTIVE_VNODE; - goto _query_over; - } - - if (pQueryMsg->pSidExtInfo == 0) { - dError("qmsg:%p,SQueryMeterMsg wrong format", pQueryMsg); - code = TSDB_CODE_INVALID_QUERY_MSG; - goto _query_over; - } - - pSids = (SMeterSidExtInfo **)pQueryMsg->pSidExtInfo; - for (int32_t i = 0; i < pQueryMsg->numOfSids; ++i) { - if (pSids[i]->sid >= pVnode->cfg.maxSessions || pSids[i]->sid < 0) { - dError("qmsg:%p sid:%d out of range, valid range:[%d,%d]", pQueryMsg, pSids[i]->sid, 0, pVnode->cfg.maxSessions); - code = TSDB_CODE_INVALID_TABLE_ID; - goto _query_over; - } - } - - // todo optimize for single table query process - pMeterObjList = (SMeterObj **)calloc(pQueryMsg->numOfSids, sizeof(SMeterObj *)); - if (pMeterObjList == NULL) { - code = TSDB_CODE_SERV_OUT_OF_MEMORY; - goto _query_over; - } - - //add query ref for all meters. if any meter failed to add ref, rollback whole operation and go to error - pthread_mutex_lock(&pVnode->vmutex); - code = vnodeIncQueryRefCount(pQueryMsg, pSids, pMeterObjList, &incNumber); - assert(incNumber <= pQueryMsg->numOfSids); - pthread_mutex_unlock(&pVnode->vmutex); - - if (code != TSDB_CODE_SUCCESS || pQueryMsg->numOfSids == 0) { // all the meters may have been dropped. - goto _query_over; - } - - pExprs = vnodeCreateSqlFunctionExpr(pQueryMsg, &code); - if (pExprs == NULL) { - assert(code != TSDB_CODE_SUCCESS); - goto _query_over; - } - - pGroupbyExpr = vnodeCreateGroupbyExpr(pQueryMsg, &code); - if ((pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) { - goto _query_over; - } - - if (pObj->qhandle) { - dTrace("QInfo:%p %s free qhandle", pObj->qhandle, __FUNCTION__); - void* qHandle = pObj->qhandle; - pObj->qhandle = NULL; - - vnodeDecRefCount(qHandle); + assert(pConn != NULL); + if (code != TSDB_CODE_SUCCESS) { + rpcSendSimpleRsp(pConn, code); + return; } - if (QUERY_IS_STABLE_QUERY(pQueryMsg->queryType)) { - pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code); - } else { - pObj->qhandle = vnodeQueryOnSingleTable(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code); + assert(pQInfo != NULL); + int32_t contLen = dnodeGetRetrieveDataSize(pQInfo); + SRetrieveMeterRsp *retrieveRsp = (SRetrieveMeterRsp *) rpcMallocCont(contLen); + if (retrieveRsp == NULL) { + rpcSendSimpleRsp(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY); + return; } -_query_over: - // if failed to add ref for all meters in this query, abort current query + code = dnodeGetRetrieveData(pQInfo, retrieveRsp); if (code != TSDB_CODE_SUCCESS) { - vnodeDecQueryRefCount(pQueryMsg, pMeterObjList, incNumber); + rpcSendSimpleRsp(pConn, TSDB_CODE_INVALID_QHANDLE); } - tfree(pQueryMsg->pSqlFuncExprs); - tfree(pMeterObjList); - ret = vnodeSendQueryRspMsg(pObj, code, pObj->qhandle); - - tfree(pQueryMsg->pSidExtInfo); - for(int32_t i = 0; i < pQueryMsg->numOfCols; ++i) { - vnodeFreeColumnInfo(&pQueryMsg->colList[i]); - } + retrieveRsp->numOfRows = htonl(retrieveRsp->numOfRows); + retrieveRsp->precision = htons(retrieveRsp->precision); + retrieveRsp->offset = htobe64(retrieveRsp->offset); + retrieveRsp->useconds = htobe64(retrieveRsp->useconds); - atomic_fetch_add_32(&tsDnodeSelectReqNum, 1); - return ret; + rpcSendResponse(pConn, retrieveRsp, contLen); } -void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { - -} +static void dnodeProcessRetrieveRequest(int8_t *pCont, int32_t contLen, void *pConn) { + dTrace("conn:%p, start to retrieve data", pConn); -void dnodeProcessRetrieveRequestCb(int code, SRetrieveMeterRsp *result, SShellObj *pObj) { - if (pObj == NULL || result == NULL || code == TSDB_CODE_ACTION_IN_PROGRESS) { - return; - } + SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *) pCont; + dnodeRetrieveData(pRetrieve, pConn, dnodeProcessRetrieveRequestCb); } -static void dnodeProcessRetrieveRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj) { - SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *) pMsg; - dnodeRetrieveData(pRetrieve, msgLen, pObj, dnodeProcessRetrieveRequestCb); -} +void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pConn) { + assert(result != NULL); -void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pObj) { - if (pObj == NULL || result == NULL || result->code == TSDB_CODE_ACTION_IN_PROGRESS) { + if (result->code != 0) { + rpcSendSimpleRsp(pConn, result->code); return; } - SShellObj *pShellObj = (SShellObj *) pObj; - int32_t msgLen = sizeof(SShellSubmitRspMsg) + result->numOfFailedBlocks * sizeof(SShellSubmitRspBlock); - SShellSubmitRspMsg *submitRsp = (SShellSubmitRspMsg *) taosBuildRspMsgWithSize(pShellObj->thandle, - TSDB_MSG_TYPE_SUBMIT_RSP, msgLen); + int32_t contLen = sizeof(SShellSubmitRspMsg) + result->numOfFailedBlocks * sizeof(SShellSubmitRspBlock); + SShellSubmitRspMsg *submitRsp = (SShellSubmitRspMsg *) rpcMallocCont(contLen); if (submitRsp == NULL) { + rpcSendSimpleRsp(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY); return; } dTrace("code:%d, numOfRows:%d affectedRows:%d", result->code, result->numOfRows, result->affectedRows); - memcpy(submitRsp, result, msgLen); + memcpy(submitRsp, result, contLen); for (int i = 0; i < submitRsp->numOfFailedBlocks; ++i) { SShellSubmitRspBlock *block = &submitRsp->failedBlocks[i]; @@ -368,6 +189,7 @@ void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pObj) { } else if (block->code == TSDB_CODE_INVALID_TABLE_ID || block->code == TSDB_CODE_NOT_ACTIVE_TABLE) { dnodeSendMeterCfgMsg(block->vnode, block->sid); } + block->index = htonl(block->index); block->vnode = htonl(block->vnode); block->sid = htonl(block->sid); block->code = htonl(block->code); @@ -378,21 +200,21 @@ void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pObj) { submitRsp->failedRows = htonl(submitRsp->failedRows); submitRsp->numOfFailedBlocks = htonl(submitRsp->numOfFailedBlocks); - taosSendMsgToPeer(pShellObj->thandle, (int8_t*)submitRsp, msgLen); + rpcSendResponse(pConn, submitRsp, contLen); } -static void dnodeProcessShellSubmitRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj) { - SShellSubmitMsg *pSubmit = (SShellSubmitMsg *) pMsg; - dnodeWriteData(pSubmit, pObj, dnodeProcessShellSubmitRequestCb); - atomic_fetch_add_32(&tsDnodeInsertReqNum, 1); +static void dnodeProcessShellSubmitRequest(int8_t *pCont, int32_t contLen, void *pConn) { + SShellSubmitMsg *pSubmit = (SShellSubmitMsg *) pCont; + dnodeWriteData(pSubmit, pConn, dnodeProcessShellSubmitRequestCb); + atomic_fetch_add_32(&tsDnodeSubmitReqNum, 1); } SDnodeStatisInfo dnodeGetStatisInfo() { SDnodeStatisInfo info = {0}; if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) { info.httpReqNum = httpGetReqCount(); - info.selectReqNum = atomic_exchange_32(&tsDnodeSelectReqNum, 0); - info.insertReqNum = atomic_exchange_32(&tsDnodeInsertReqNum, 0); + info.queryReqNum = atomic_exchange_32(&tsDnodeQueryReqNum, 0); + info.submitReqNum = atomic_exchange_32(&tsDnodeSubmitReqNum, 0); } return info; diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 75ef1be7413588952288333a07ba0995493a051b..588ba49483d8aecd4369996ee98e63805c8d26a5 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -19,18 +19,14 @@ #include "tlog.h" #include "dnodeWrite.h" -int32_t dnodeCheckTableExist(char *tableId) { - return 0; -} - -void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pShellObj, void (*callback)(SShellSubmitRspMsg *, void *)) { +void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)) { SShellSubmitRspMsg result = {0}; int32_t numOfSid = htonl(pSubmit->numOfSid); if (numOfSid <= 0) { dError("invalid num of tables:%d", numOfSid); result.code = TSDB_CODE_INVALID_QUERY_MSG; - callback(&result, pShellObj); + callback(&result, pConn); } //TODO: submit implementation diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 326cddcba9f5ba030ce0ee281fece25f9c526971..27594686d44c2c0bbc1f21322c3ae449012a03c5 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -25,8 +25,8 @@ extern "C" { #include "tsched.h" typedef struct { - int32_t selectReqNum; - int32_t insertReqNum; + int32_t queryReqNum; + int32_t submitReqNum; int32_t httpReqNum; } SDnodeStatisInfo; diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 028e6340cfb7eea10587817ad6a8ba433d7868c0..288daa6ba7d68ac65a6e64f69ec242912fc55e1d 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -238,6 +238,9 @@ extern "C" { #define TSQL_SO_ASC 1 #define TSQL_SO_DESC 0 +#define TSDB_SESSIONS_PER_VNODE (300) +#define TSDB_SESSIONS_PER_DNODE (TSDB_SESSIONS_PER_VNODE * TSDB_MAX_VNODES) + #ifdef __cplusplus } #endif diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 2c3d6da6f8f419bbbb93e4021b690f2cafe931ee..0b41a1b9499a9485df6c421f31816b5e404d5658 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -28,114 +28,118 @@ extern "C" { #include "taosdef.h" // message type -#define TSDB_MSG_TYPE_REG 1 -#define TSDB_MSG_TYPE_REG_RSP 2 -#define TSDB_MSG_TYPE_SUBMIT 3 -#define TSDB_MSG_TYPE_SUBMIT_RSP 4 -#define TSDB_MSG_TYPE_NWCHANGE 5 -#define TSDB_MSG_TYPE_NWCHANGE_RSP 6 -#define TSDB_MSG_TYPE_DELIVER 7 -#define TSDB_MSG_TYPE_DELIVER_RSP 8 - -#define TSDB_MSG_TYPE_CREATE 9 -#define TSDB_MSG_TYPE_CREATE_RSP 10 -#define TSDB_MSG_TYPE_REMOVE 11 -#define TSDB_MSG_TYPE_REMOVE_RSP 12 -#define TSDB_MSG_TYPE_VPEERS 13 -#define TSDB_MSG_TYPE_VPEERS_RSP 14 -#define TSDB_MSG_TYPE_FREE_VNODE 15 -#define TSDB_MSG_TYPE_FREE_VNODE_RSP 16 -#define TSDB_MSG_TYPE_VPEER_CFG 17 -#define TSDB_MSG_TYPE_VPEER_CFG_RSP 18 -#define TSDB_MSG_TYPE_METER_CFG 19 -#define TSDB_MSG_TYPE_METER_CFG_RSP 20 - -#define TSDB_MSG_TYPE_VPEER_FWD 21 -#define TSDB_MSG_TYPE_VPEER_FWD_RSP 22 -#define TSDB_MSG_TYPE_SYNC 23 -#define TSDB_MSG_TYPE_SYNC_RSP 24 - -#define TSDB_MSG_TYPE_INSERT 25 -#define TSDB_MSG_TYPE_INSERT_RSP 26 -#define TSDB_MSG_TYPE_QUERY 27 -#define TSDB_MSG_TYPE_QUERY_RSP 28 -#define TSDB_MSG_TYPE_RETRIEVE 29 -#define TSDB_MSG_TYPE_RETRIEVE_RSP 30 - -#define TSDB_MSG_TYPE_CONNECT 31 -#define TSDB_MSG_TYPE_CONNECT_RSP 32 -#define TSDB_MSG_TYPE_CREATE_ACCT 33 -#define TSDB_MSG_TYPE_CREATE_ACCT_RSP 34 -#define TSDB_MSG_TYPE_CREATE_USER 35 -#define TSDB_MSG_TYPE_CREATE_USER_RSP 36 -#define TSDB_MSG_TYPE_DROP_ACCT 37 -#define TSDB_MSG_TYPE_DROP_ACCT_RSP 38 -#define TSDB_MSG_TYPE_DROP_USER 39 -#define TSDB_MSG_TYPE_DROP_USER_RSP 40 -#define TSDB_MSG_TYPE_ALTER_USER 41 -#define TSDB_MSG_TYPE_ALTER_USER_RSP 42 -#define TSDB_MSG_TYPE_CREATE_MNODE 43 -#define TSDB_MSG_TYPE_CREATE_MNODE_RSP 44 -#define TSDB_MSG_TYPE_DROP_MNODE 45 -#define TSDB_MSG_TYPE_DROP_MNODE_RSP 46 -#define TSDB_MSG_TYPE_CREATE_DNODE 47 -#define TSDB_MSG_TYPE_CREATE_DNODE_RSP 48 -#define TSDB_MSG_TYPE_DROP_DNODE 49 -#define TSDB_MSG_TYPE_DROP_DNODE_RSP 50 -#define TSDB_MSG_TYPE_CREATE_DB 51 -#define TSDB_MSG_TYPE_CREATE_DB_RSP 52 -#define TSDB_MSG_TYPE_DROP_DB 53 -#define TSDB_MSG_TYPE_DROP_DB_RSP 54 -#define TSDB_MSG_TYPE_USE_DB 55 -#define TSDB_MSG_TYPE_USE_DB_RSP 56 -#define TSDB_MSG_TYPE_CREATE_TABLE 57 -#define TSDB_MSG_TYPE_CREATE_TABLE_RSP 58 - -#define TSDB_MSG_TYPE_DROP_TABLE 59 -#define TSDB_MSG_TYPE_DROP_TABLE_RSP 60 -#define TSDB_MSG_TYPE_METERINFO 61 -#define TSDB_MSG_TYPE_METERINFO_RSP 62 -#define TSDB_MSG_TYPE_METRIC_META 63 -#define TSDB_MSG_TYPE_METRIC_META_RSP 64 -#define TSDB_MSG_TYPE_SHOW 65 -#define TSDB_MSG_TYPE_SHOW_RSP 66 - -#define TSDB_MSG_TYPE_FORWARD 67 -#define TSDB_MSG_TYPE_FORWARD_RSP 68 - -#define TSDB_MSG_TYPE_CFG_PNODE 69 -#define TSDB_MSG_TYPE_CFG_PNODE_RSP 70 -#define TSDB_MSG_TYPE_CFG_MNODE 71 -#define TSDB_MSG_TYPE_CFG_MNODE_RSP 72 - -#define TSDB_MSG_TYPE_KILL_QUERY 73 -#define TSDB_MSG_TYPE_KILL_QUERY_RSP 74 -#define TSDB_MSG_TYPE_KILL_STREAM 75 -#define TSDB_MSG_TYPE_KILL_STREAM_RSP 76 -#define TSDB_MSG_TYPE_KILL_CONNECTION 77 -#define TSDB_MSG_TYPE_KILL_CONNECTION_RSP 78 - -#define TSDB_MSG_TYPE_ALTER_STREAM 79 -#define TSDB_MSG_TYPE_ALTER_STREAM_RSP 80 -#define TSDB_MSG_TYPE_ALTER_TABLE 81 -#define TSDB_MSG_TYPE_ALTER_TABLE_RSP 82 -#define TSDB_MSG_TYPE_ALTER_DB 83 -#define TSDB_MSG_TYPE_ALTER_DB_RSP 84 - -#define TSDB_MSG_TYPE_MULTI_METERINFO 85 -#define TSDB_MSG_TYPE_MULTI_METERINFO_RSP 86 - -#define TSDB_MSG_TYPE_HEARTBEAT 91 -#define TSDB_MSG_TYPE_HEARTBEAT_RSP 92 -#define TSDB_MSG_TYPE_STATUS 93 -#define TSDB_MSG_TYPE_STATUS_RSP 94 -#define TSDB_MSG_TYPE_GRANT 95 -#define TSDB_MSG_TYPE_GRANT_RSP 96 - -#define TSDB_MSG_TYPE_ALTER_ACCT 97 -#define TSDB_MSG_TYPE_ALTER_ACCT_RSP 98 - -#define TSDB_MSG_TYPE_MAX 101 +#define TSDB_MSG_TYPE_REG 1 +#define TSDB_MSG_TYPE_REG_RSP 2 +#define TSDB_MSG_TYPE_DNODE_SUBMIT 3 +#define TSDB_MSG_TYPE_DNODE_SUBMIT_RSP 4 +#define TSDB_MSG_TYPE_DNODE_QUERY 5 +#define TSDB_MSG_TYPE_DNODE_QUERY_RSP 6 +#define TSDB_MSG_TYPE_DNODE_RETRIEVE 7 +#define TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP 8 +#define TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE 9 +#define TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE_RSP 10 +#define TSDB_MSG_TYPE_DNODE_CREATE_NORMAL_TABLE 11 +#define TSDB_MSG_TYPE_DNODE_CREATE_NORMAL_TABLE_RSP 12 +#define TSDB_MSG_TYPE_DNODE_CREATE_STREAM_TABLE 13 +#define TSDB_MSG_TYPE_DNODE_CREATE_STREAM_TABLE_RSP 14 +#define TSDB_MSG_TYPE_DNODE_CREATE_SUPER_TABLE 15 +#define TSDB_MSG_TYPE_DNODE_CREATE_SUPER_TABLE_RSP 16 +#define TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE 17 +#define TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE_RSP 18 +#define TSDB_MSG_TYPE_DNODE_REMOVE_NORMAL_TABLE 19 +#define TSDB_MSG_TYPE_DNODE_REMOVE_NORMAL_TABLE_RSP 20 +#define TSDB_MSG_TYPE_DNODE_REMOVE_STREAM_TABLE 21 +#define TSDB_MSG_TYPE_DNODE_REMOVE_STREAM_TABLE_RSP 22 +#define TSDB_MSG_TYPE_DNODE_REMOVE_SUPER_TABLE 23 +#define TSDB_MSG_TYPE_DNODE_REMOVE_SUPER_TABLE_RSP 24 +#define TSDB_MSG_TYPE_DNODE_ALTER_CHILD_TABLE 25 +#define TSDB_MSG_TYPE_DNODE_ALTER_CHILD_TABLE_RSP 26 +#define TSDB_MSG_TYPE_DNODE_ALTER_NORMAL_TABLE 27 +#define TSDB_MSG_TYPE_DNODE_ALTER_NORMAL_TABLE_RSP 28 +#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_TABLE 29 +#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_TABLE_RSP 30 +#define TSDB_MSG_TYPE_DNODE_ALTER_SUPER_TABLE 31 +#define TSDB_MSG_TYPE_DNODE_ALTER_SUPER_TABLE_RSP 32 +#define TSDB_MSG_TYPE_DNODE_VPEERS 33 +#define TSDB_MSG_TYPE_DNODE_VPEERS_RSP 34 +#define TSDB_MSG_TYPE_DNODE_FREE_VNODE 35 +#define TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP 36 +#define TSDB_MSG_TYPE_DNODE_CFG 37 +#define TSDB_MSG_TYPE_DNODE_CFG_RSP 38 +#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM 39 +#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_RSP 40 + +#define TSDB_MSG_TYPE_SDB_SYNC 41 +#define TSDB_MSG_TYPE_SDB_SYNC_RSP 42 +#define TSDB_MSG_TYPE_SDB_FORWARD 43 +#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 44 +#define TSDB_MSG_TYPE_CONNECT 51 +#define TSDB_MSG_TYPE_CONNECT_RSP 52 +#define TSDB_MSG_TYPE_CREATE_ACCT 53 +#define TSDB_MSG_TYPE_CREATE_ACCT_RSP 54 +#define TSDB_MSG_TYPE_ALTER_ACCT 55 +#define TSDB_MSG_TYPE_ALTER_ACCT_RSP 56 +#define TSDB_MSG_TYPE_DROP_ACCT 57 +#define TSDB_MSG_TYPE_DROP_ACCT_RSP 58 +#define TSDB_MSG_TYPE_CREATE_USER 59 +#define TSDB_MSG_TYPE_CREATE_USER_RSP 60 +#define TSDB_MSG_TYPE_ALTER_USER 61 +#define TSDB_MSG_TYPE_ALTER_USER_RSP 62 +#define TSDB_MSG_TYPE_DROP_USER 63 +#define TSDB_MSG_TYPE_DROP_USER_RSP 64 +#define TSDB_MSG_TYPE_CREATE_MNODE 65 +#define TSDB_MSG_TYPE_CREATE_MNODE_RSP 66 +#define TSDB_MSG_TYPE_DROP_MNODE 67 +#define TSDB_MSG_TYPE_DROP_MNODE_RSP 68 +#define TSDB_MSG_TYPE_CREATE_DNODE 69 +#define TSDB_MSG_TYPE_CREATE_DNODE_RSP 70 +#define TSDB_MSG_TYPE_DROP_DNODE 71 +#define TSDB_MSG_TYPE_DROP_DNODE_RSP 72 +#define TSDB_MSG_TYPE_ALTER_DNODE 73 +#define TSDB_MSG_TYPE_ALTER_DNODE_RSP 74 +#define TSDB_MSG_TYPE_CREATE_DB 75 +#define TSDB_MSG_TYPE_CREATE_DB_RSP 76 +#define TSDB_MSG_TYPE_DROP_DB 77 +#define TSDB_MSG_TYPE_DROP_DB_RSP 78 +#define TSDB_MSG_TYPE_USE_DB 79 +#define TSDB_MSG_TYPE_USE_DB_RSP 80 +#define TSDB_MSG_TYPE_ALTER_DB 81 +#define TSDB_MSG_TYPE_ALTER_DB_RSP 82 +#define TSDB_MSG_TYPE_CREATE_TABLE 83 +#define TSDB_MSG_TYPE_CREATE_TABLE_RSP 84 +#define TSDB_MSG_TYPE_DROP_TABLE 85 +#define TSDB_MSG_TYPE_DROP_TABLE_RSP 86 +#define TSDB_MSG_TYPE_ALTER_TABLE 87 +#define TSDB_MSG_TYPE_ALTER_TABLE_RSP 88 +#define TSDB_MSG_TYPE_VNODE_CFG 89 +#define TSDB_MSG_TYPE_VNODE_CFG_RSP 90 +#define TSDB_MSG_TYPE_TABLE_CFG 91 +#define TSDB_MSG_TYPE_TABLE_CFG_RSP 92 +#define TSDB_MSG_TYPE_TABLE_META 93 +#define TSDB_MSG_TYPE_TABLE_META_RSP 94 +#define TSDB_MSG_TYPE_STABLE_META 95 +#define TSDB_MSG_TYPE_STABLE_META_RSP 96 +#define TSDB_MSG_TYPE_MULTI_TABLE_META 97 +#define TSDB_MSG_TYPE_MULTI_TABLE_META_RSP 98 +#define TSDB_MSG_TYPE_ALTER_STREAM 99 +#define TSDB_MSG_TYPE_ALTER_STREAM_RSP 100 +#define TSDB_MSG_TYPE_SHOW 101 +#define TSDB_MSG_TYPE_SHOW_RSP 102 +#define TSDB_MSG_TYPE_CFG_MNODE 103 +#define TSDB_MSG_TYPE_CFG_MNODE_RSP 104 +#define TSDB_MSG_TYPE_KILL_QUERY 105 +#define TSDB_MSG_TYPE_KILL_QUERY_RSP 106 +#define TSDB_MSG_TYPE_KILL_STREAM 107 +#define TSDB_MSG_TYPE_KILL_STREAM_RSP 108 +#define TSDB_MSG_TYPE_KILL_CONNECTION 109 +#define TSDB_MSG_TYPE_KILL_CONNECTION_RSP 110 +#define TSDB_MSG_TYPE_HEARTBEAT 111 +#define TSDB_MSG_TYPE_HEARTBEAT_RSP 112 +#define TSDB_MSG_TYPE_STATUS 113 +#define TSDB_MSG_TYPE_STATUS_RSP 114 +#define TSDB_MSG_TYPE_GRANT 115 +#define TSDB_MSG_TYPE_GRANT_RSP 116 +#define TSDB_MSG_TYPE_MAX 117 // IE type #define TSDB_IE_TYPE_SEC 1 @@ -287,13 +291,14 @@ typedef struct { } SShellSubmitMsg; typedef struct { + int32_t index; // index of failed block in submit blocks int32_t vnode; // vnode index of failed block int32_t sid; // table index of failed block int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table } SShellSubmitRspBlock; typedef struct { - int32_t code; // 0-success, 1-inprogress, > 1 error code + int32_t code; // 0-success, > 0 error code int32_t numOfRows; // number of records the client is trying to write int32_t affectedRows; // number of records actually written int32_t failedRows; // number of failed records (exclude duplicate records) @@ -301,7 +306,6 @@ typedef struct { SShellSubmitRspBlock *failedBlocks; } SShellSubmitRspMsg; - typedef struct SSchema { uint8_t type; char name[TSDB_COL_NAME_LEN]; diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 7738c2f28b11857ae7c8a5f3978665e31dca199d..84090da5cfe7a24cf3afbb4a03d224a763db40b7 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -50,16 +50,16 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { SVgObj * pVgroup; if (!sdbMaster) { - taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_METER_CFG_RSP, TSDB_CODE_REDIRECT); + taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_REDIRECT); return 0; } int vnode = htonl(pCfg->vnode); int sid = htonl(pCfg->sid); - pStart = taosBuildRspMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_METER_CFG_RSP, 64000); + pStart = taosBuildRspMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, 64000); if (pStart == NULL) { - taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_METER_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); + taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); return 0; } @@ -100,15 +100,15 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { SVgObj * pVgroup = NULL; if (!sdbMaster) { - taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP, TSDB_CODE_REDIRECT); + taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_REDIRECT); return 0; } int vnode = htonl(pCfg->vnode); - pStart = taosBuildRspMsgToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP); + pStart = taosBuildRspMsgToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP); if (pStart == NULL) { - taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); + taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); return 0; } pMsg = pStart; @@ -142,7 +142,7 @@ int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) { STaosRsp *pRsp = (STaosRsp *)msg; if (!sdbMaster) { - taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VPEERS_RSP, TSDB_CODE_REDIRECT); + taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_DNODE_VPEERS_RSP, TSDB_CODE_REDIRECT); return 0; } @@ -172,19 +172,19 @@ int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) { } void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *pObj) { - if (msgType == TSDB_MSG_TYPE_METER_CFG) { + if (msgType == TSDB_MSG_TYPE_TABLE_CFG) { mgmtProcessMeterCfgMsg(content, msgLen - sizeof(SIntMsg), pObj); - } else if (msgType == TSDB_MSG_TYPE_VPEER_CFG) { + } else if (msgType == TSDB_MSG_TYPE_VNODE_CFG) { mgmtProcessVpeerCfgMsg(content, msgLen - sizeof(SIntMsg), pObj); - } else if (msgType == TSDB_MSG_TYPE_CREATE_RSP) { + } else if (msgType == TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE_RSP) { mgmtProcessCreateRsp(content, msgLen - sizeof(SIntMsg), pObj); } else if (msgType == TSDB_MSG_TYPE_REMOVE_RSP) { // do nothing - } else if (msgType == TSDB_MSG_TYPE_VPEERS_RSP) { + } else if (msgType == TSDB_MSG_TYPE_DNODE_VPEERS_RSP) { mgmtProcessVPeersRsp(content, msgLen - sizeof(SIntMsg), pObj); - } else if (msgType == TSDB_MSG_TYPE_FREE_VNODE_RSP) { + } else if (msgType == TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP) { mgmtProcessFreeVnodeRsp(content, msgLen - sizeof(SIntMsg), pObj); - } else if (msgType == TSDB_MSG_TYPE_CFG_PNODE_RSP) { + } else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) { // do nothing; } else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) { // do nothing; @@ -243,7 +243,7 @@ int32_t mgmtSendCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, int continue; } - int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_CREATE, 64000); + int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE, 64000); if (pStart == NULL) { continue; } @@ -267,7 +267,7 @@ int32_t mgmtSendCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup) { continue; } - int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_CREATE, 64000); + int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE, 64000); if (pStart == NULL) { continue; } @@ -291,7 +291,7 @@ int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) { continue; } - int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_CREATE, 64000); + int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE, 64000); if (pStart == NULL) { continue; } @@ -322,7 +322,7 @@ int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) { pObj = mgmtGetDnode(pVgroup->vnodeGid[i].ip); if (pObj == NULL) continue; - pStart = taosBuildReqMsgToDnode(pObj, TSDB_MSG_TYPE_REMOVE); + pStart = taosBuildReqMsgToDnode(pObj, TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE); if (pStart == NULL) continue; pMsg = pStart; @@ -428,7 +428,7 @@ int mgmtSendVPeersMsg(SVgObj *pVgroup) { mgmtUpdateDnode(pDnode); if (pDnode->thandle && pVgroup->numOfVnodes >= 1) { - pStart = taosBuildReqMsgToDnode(pDnode, TSDB_MSG_TYPE_VPEERS); + pStart = taosBuildReqMsgToDnode(pDnode, TSDB_MSG_TYPE_DNODE_VPEERS); if (pStart == NULL) continue; pMsg = mgmtBuildVpeersIe(pStart, pVgroup, pVgroup->vnodeGid[i].vnode); msgLen = pMsg - pStart; @@ -457,7 +457,7 @@ int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid) { return -1; } - pStart = taosBuildReqMsgToDnode(pDnode, TSDB_MSG_TYPE_FREE_VNODE); + pStart = taosBuildReqMsgToDnode(pDnode, TSDB_MSG_TYPE_DNODE_FREE_VNODE); if (pStart == NULL) return -1; pMsg = pStart; @@ -539,7 +539,7 @@ int mgmtSendCfgDnodeMsg(char *cont) { } #ifdef CLUSTER - pStart = taosBuildReqMsg(pDnode->thandle, TSDB_MSG_TYPE_CFG_PNODE); + pStart = taosBuildReqMsg(pDnode->thandle, TSDB_MSG_TYPE_DNODE_CFG); if (pStart == NULL) return TSDB_CODE_NODE_OFFLINE; pMsg = pStart; diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index aea091b7525d1c000f0f74a694d06a1fe804371b..9619804bf1d6676e4be88ad77c02d074453be91a 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -138,7 +138,7 @@ static uint32_t mgmtSetMeterTagValue(char *pTags, STabObj *pMetric, STabObj *pMe } static char *mgmtAllocMsg(SConnObj *pConn, int32_t size, char **pMsg, STaosRsp **pRsp) { - char *pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_METERINFO_RSP, size); + char *pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, size); if (pStart == NULL) return 0; *pMsg = pStart; *pRsp = (STaosRsp *)(*pMsg); @@ -147,7 +147,7 @@ static char *mgmtAllocMsg(SConnObj *pConn, int32_t size, char **pMsg, STaosRsp * } static char *mgmtForMultiAllocMsg(SConnObj *pConn, int32_t size, char **pMsg, STaosRsp **pRsp) { - char *pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_MULTI_METERINFO_RSP, size); + char *pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_MULTI_TABLE_META_RSP, size); if (pStart == NULL) return 0; *pMsg = pStart; *pRsp = (STaosRsp *)(*pMsg); @@ -197,7 +197,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { if (pDb == NULL || (pDb != NULL && pDb->dropStatus != TSDB_DB_STATUS_READY)) { if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) { - taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); + taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); return 0; } @@ -212,13 +212,13 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { // on demand create table from super table if meter does not exists if (pMeterObj == NULL && pInfo->createFlag == 1) { // write operation needs to redirect to master mnode - if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_METERINFO_RSP) != 0) { + if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_TABLE_META_RSP) != 0) { return 0; } SCreateTableMsg *pCreateMsg = calloc(1, sizeof(SCreateTableMsg) + sizeof(STagData)); if (pCreateMsg == NULL) { - taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); + taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); return 0; } @@ -241,7 +241,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { if (code != TSDB_CODE_SUCCESS) { if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) { - taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); + taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); return 0; } @@ -255,7 +255,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { } if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) { - taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); + taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); return 0; } @@ -364,7 +364,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { char *pNewMsg; if ((pStart = mgmtForMultiAllocMsg(pConn, size, &pNewMsg, &pRsp)) == NULL) { - taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_MULTI_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); + taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_MULTI_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); return 0; } @@ -393,7 +393,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { if (NULL == pMsgHdr) { char* pTmp = pStart - sizeof(STaosHeader); tfree(pTmp); - taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_MULTI_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); + taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_MULTI_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); break; } @@ -523,9 +523,9 @@ int mgmtProcessMetricMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name); if (pMetric == NULL || (pDb != NULL && pDb->dropStatus != TSDB_DB_STATUS_READY)) { - pStart = taosBuildRspMsg(pConn->thandle, TSDB_MSG_TYPE_METRIC_META_RSP); + pStart = taosBuildRspMsg(pConn->thandle, TSDB_MSG_TYPE_STABLE_META_RSP); if (pStart == NULL) { - taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METRIC_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); + taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_STABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); return 0; } @@ -541,7 +541,7 @@ int mgmtProcessMetricMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { } else { msgLen = mgmtRetrieveMetricMeta(pConn, &pStart, pSuperTableMetaMsg); if (msgLen <= 0) { - taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METRIC_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); + taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_STABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); return 0; } } @@ -999,14 +999,14 @@ int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) { */ if (pRetrieve->qhandle != pConn->qhandle) { mError("retrieve:%p, qhandle:%p is not matched with saved:%p", pRetrieve, pRetrieve->qhandle, pConn->qhandle); - taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, TSDB_CODE_MEMORY_CORRUPTED); + taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, TSDB_CODE_MEMORY_CORRUPTED); return -1; } pShow = (SShowObj *)pRetrieve->qhandle; if (pShow->signature != (void *)pShow) { mError("pShow:%p, signature:%p, query memory is corrupted", pShow, pShow->signature); - taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, TSDB_CODE_MEMORY_CORRUPTED); + taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, TSDB_CODE_MEMORY_CORRUPTED); return -1; } else { if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { @@ -1024,9 +1024,9 @@ int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) { size = pShow->rowSize * rowsToRead; } - pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, size + 100); + pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, size + 100); if (pStart == NULL) { - taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); + taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); return 0; } @@ -1206,7 +1206,7 @@ int mgmtProcessCfgDnodeMsg(char *pMsg, int msgLen, SConnObj *pConn) { code = mgmtSendCfgDnodeMsg(pMsg); } - taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CFG_PNODE_RSP, code); + taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DNODE_CFG_RSP, code); if (code == 0) mTrace("dnode:%s is configured by %s", pCfg->ip, pConn->pUser->user); @@ -1464,9 +1464,9 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { int contLen = pMsg->msgLen - sizeof(SIntMsg) - sizeof(SMgmtHead); // read-only request can be executed concurrently - if ((pMsg->msgType == TSDB_MSG_TYPE_METERINFO && (!mgmtCheckMeterMetaMsgType(cont))) || - pMsg->msgType == TSDB_MSG_TYPE_METRIC_META || pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE || - pMsg->msgType == TSDB_MSG_TYPE_SHOW || pMsg->msgType == TSDB_MSG_TYPE_MULTI_METERINFO) { + if ((pMsg->msgType == TSDB_MSG_TYPE_TABLE_META && (!mgmtCheckMeterMetaMsgType(cont))) || + pMsg->msgType == TSDB_MSG_TYPE_STABLE_META || pMsg->msgType == TSDB_MSG_TYPE_DNODE_RETRIEVE || + pMsg->msgType == TSDB_MSG_TYPE_SHOW || pMsg->msgType == TSDB_MSG_TYPE_MULTI_TABLE_META) { (*mgmtProcessShellMsg[pMsg->msgType])(cont, contLen, pConn); } else { if (mgmtProcessShellMsg[pMsg->msgType]) { @@ -1498,9 +1498,9 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { } void mgmtInitProcessShellMsg() { - mgmtProcessShellMsg[TSDB_MSG_TYPE_METERINFO] = mgmtProcessMeterMetaMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_METRIC_META] = mgmtProcessMetricMetaMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_MULTI_METERINFO] = mgmtProcessMultiMeterMetaMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_TABLE_META] = mgmtProcessMeterMetaMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_STABLE_META] = mgmtProcessMetricMetaMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_MULTI_TABLE_META] = mgmtProcessMultiMeterMetaMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_DB] = mgmtProcessCreateDbMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_DB] = mgmtProcessAlterDbMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_USER] = mgmtProcessCreateUserMsg; @@ -1516,7 +1516,7 @@ void mgmtInitProcessShellMsg() { mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_TABLE] = mgmtProcessAlterTableMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_USE_DB] = mgmtProcessUseDbMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_RETRIEVE] = mgmtProcessRetrieveMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_DNODE_RETRIEVE] = mgmtProcessRetrieveMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_SHOW] = mgmtProcessShowMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_CONNECT] = mgmtProcessConnectMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_HEARTBEAT] = mgmtProcessHeartBeatMsg; @@ -1525,7 +1525,7 @@ void mgmtInitProcessShellMsg() { mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_MNODE] = mgmtProcessCreateMnodeMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_MNODE] = mgmtProcessDropMnodeMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_CFG_MNODE] = mgmtProcessCfgMnodeMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_CFG_PNODE] = mgmtProcessCfgDnodeMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_DNODE_CFG] = mgmtProcessCfgDnodeMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_QUERY] = mgmtProcessKillQueryMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_STREAM] = mgmtProcessKillStreamMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_CONNECTION] = mgmtProcessKillConnectionMsg; diff --git a/src/modules/monitor/src/monitorSystem.c b/src/modules/monitor/src/monitorSystem.c index a5985b4393101993051f811169e79d72722ef9fe..cac6c93588b707fdbb5ea60f8a7a3796e931bc10 100644 --- a/src/modules/monitor/src/monitorSystem.c +++ b/src/modules/monitor/src/monitorSystem.c @@ -334,10 +334,10 @@ int monitorBuildBandSql(char *sql) { int monitorBuildReqSql(char *sql) { SDnodeStatisInfo info; - info.httpReqNum = info.insertReqNum = info.selectReqNum = 0; + info.httpReqNum = info.submitReqNum = info.queryReqNum = 0; (*mnodeCountRequestFp)(&info); - return sprintf(sql, ", %d, %d, %d)", info.httpReqNum, info.selectReqNum, info.insertReqNum); + return sprintf(sql, ", %d, %d, %d)", info.httpReqNum, info.queryReqNum, info.submitReqNum); } int monitorBuildIoSql(char *sql) { diff --git a/src/rpc/src/tstring.c b/src/rpc/src/tstring.c index fe7b39fa741a0a0453c284851d9d9ffcd118b222..366d3443f6bedadf2c8251e2c3f8f017bdb78c90 100644 --- a/src/rpc/src/tstring.c +++ b/src/rpc/src/tstring.c @@ -13,112 +13,134 @@ * along with this program. If not, see . */ -char *taosMsg[] = {"null", - "registration", - "registration-rsp", - "submit", - "submit-rsp", - "nw-change", - "nw-change-rsp", - "deliver", - "deliver-rsp", +char *taosMsg[] = { + "null", + "registration", + "registration-rsp", + "submit", + "submit-rsp", + "query", + "query-rsp", + "retrieve", + "retrieve-rsp", + "create-table", + "create-table-rsp", //10 - "create", - "create-rsp", - "remove", - "remove-rsp", - "vpeers", - "vpeers-rsp", - "free-vnode", - "free-vnode-rsp", - "vpeer-cfg", - "vpeer-cfg-rsp", - "meter-cfg", - "meter-cfg-rsp", + "create-normal-table", + "create-normal-table-rsp", + "create-stream-table", + "create-stream-table-rsp", + "create-super-table", + "create-super-table-rsp", + "remove-table", + "remove-table-rsp", + "remove-normal-table", + "remove-normal-table-rsp", //20 - "vpeer-fwd", - "vpeer-fwd-rsp", - "sync", - "sync-rsp", + "remove-stream-table", + "remove-stream-table-rsp", + "remove-super-table", + "remove-super-table-rsp", + "alter-table", + "alter-table-rsp", + "alter-normal-table", + "alter-normal-table-rsp", + "alter-stream-table", + "alter-stream-table-rsp", //30 - "insert", - "insert-rsp", - "query", - "query-rsp", - "retrieve", - "retrieve-rsp", + "alter-super-table", + "alter-super-table-rsp", + "vpeers", + "vpeers-rsp", + "free-vnode", + "free-vnode-rsp", + "cfg-dnode", + "cfg-dnode-rsp", + "alter-stream", + "alter-stream-rsp", //40 - "connect", - "connect-rsp", - "create-acct", - "create-acct-rsp", - "create-user", - "create-user-rsp", - "drop-acct", - "drop-acct-rsp", - "drop-user", - "drop-user-rsp", - "alter-user", - "alter-user-rsp", - "create-mnode", - "create-mnode-rsp", - "drop-mnode", - "drop-mnode-rsp", - "create-dnode", - "create-dnode-rsp", - "drop-dnode", - "drop-dnode-rsp", - "create-db", - "create-db-rsp", - "drop-db", - "drop-db-rsp", - "use-db", - "use-db-rsp", - "create-table", - "create-table-rsp", - "drop-table", - "drop-table-rsp", - "meter-info", - "meter-info-rsp", - "metric-meta", - "metric-meta-rsp", - "show", - "show-rsp", + "sync", + "sync-rsp", + "forward", + "forward-rsp", + "", + "", + "", + "", + "", + "", //50 - "forward", - "forward-rsp", + "connect", + "connect-rsp", + "create-acct", + "create-acct-rsp", + "alter-acct", + "alter-acct-rsp", + "drop-acct", + "drop-acct-rsp", + "create-user", + "create-user-rsp", //60 - "cfg-dnode", - "cfg-dnode-rsp", - "cfg-mnode", - "cfg-mnode-rsp", + "alter-user", + "alter-user-rsp", + "drop-user", + "drop-user-rsp", + "create-mnode", + "create-mnode-rsp", + "drop-mnode", + "drop-mnode-rsp", + "create-dnode", + "create-dnode-rsp", //70 - "kill-query", - "kill-query-rsp", - "kill-stream", - "kill-stream-rsp", - "kill-connection", - "kill-connectoin-rsp", // 78 - "alter-stream", - "alter-stream-rsp", - "alter-table", - "alter-table-rsp", + "drop-dnode", + "drop-dnode-rsp", + "alter-dnode", + "alter-dnode-rsp", + "create-db", + "create-db-rsp", + "drop-db", + "drop-db-rsp", + "use-db", + "use-db-rsp", //80 - "", - "", - "", - "", - "", - "", - "", - "", + "alter-db", + "alter-db-rsp", + "create-table", + "create-table-rsp", + "drop-table", + "drop-table-rsp", + "alter-table", + "alter-table-rsp", + "cfg-vnode", + "cfg-vnode-rsp", //90 - "heart-beat", // 91 - "heart-beat-rsp", - "status", - "status-rsp", - "grant", - "grant-rsp", - "alter-acct", - "alter-acct-rsp", - "invalid"}; + "cfg-table", + "cfg-table-rsp", + "table-meta", + "table-meta-rsp", + "super-table-meta", + "super-stable-meta-rsp", + "multi-table-meta", + "multi-table-meta-rsp", + "alter-stream", + "alter-stream-rsp", //100 + + "show", + "show-rsp", + "cfg-mnode", + "cfg-mnode-rsp", + "kill-query", + "kill-query-rsp", + "kill-stream", + "kill-stream-rsp", + "kill-connection", + "kill-connectoin-rsp", //110 + + "heart-beat", + "heart-beat-rsp", + "status", + "status-rsp", + "grant", + "grant-rsp", + "max" +}; \ No newline at end of file diff --git a/src/util/inc/tsched.h b/src/util/inc/tsched.h index 827ecbbb421b78c7d5140efc5c3be6e1edca4578..b46a0c455ab50a0b42decb7f700438507e9192b8 100644 --- a/src/util/inc/tsched.h +++ b/src/util/inc/tsched.h @@ -25,7 +25,7 @@ typedef struct _sched_msg { void (*tfp)(void *, void *); - char *msg; + int8_t *msg; void *ahandle; void *thandle; } SSchedMsg; diff --git a/src/vnode/CMakeLists.txt b/src/vnode/CMakeLists.txt index 92270a886f4cae4363e8c774bfa2622c3989b23b..5acf0483ec67c3d23573d7ac2dd42604e001276c 100644 --- a/src/vnode/CMakeLists.txt +++ b/src/vnode/CMakeLists.txt @@ -4,4 +4,8 @@ project(tsdb) add_subdirectory(common) -add_subdirectory(tsdb) \ No newline at end of file +add_subdirectory(tsdb) + +enable_testing() + +add_subdirectory(tests) \ No newline at end of file diff --git a/src/vnode/common/inc/dataformat.h b/src/vnode/common/inc/dataformat.h index dd8766f96ace09cb4ce69bd2c7eaa68d3151ad8a..b0389955fb18cbac8af4fa280166390d3ce6aead 100644 --- a/src/vnode/common/inc/dataformat.h +++ b/src/vnode/common/inc/dataformat.h @@ -46,6 +46,7 @@ typedef char * SDataCols; // ----------------- Data column structure // ---- operation on SDataRow; +#define TD_DATA_ROW_HEADER_SIZE sizeof(int32_t) #define TD_DATAROW_LEN(pDataRow) (*(int32_t *)(pDataRow)) #define TD_DATAROW_DATA(pDataRow) ((pDataRow) + sizeof(int32_t)) @@ -63,5 +64,9 @@ typedef char * SDataCols; #define TD_DATACOLS_NPOINTS(pDataCols) (*(int32_t *)(pDataCols + sizeof(int32_t))) // ---- +/** + * Get the maximum + */ +int32_t tdGetMaxDataRowSize(SSchema *pSchema); #endif // _TD_DATA_FORMAT_H_ diff --git a/src/vnode/common/src/dataformat.c b/src/vnode/common/src/dataformat.c index f0a2cefc218a5b9bd38c9eec3e2d6edf9e9a5f29..f09ea1445b089cdf2944a902c0c744ed73f6deef 100644 --- a/src/vnode/common/src/dataformat.c +++ b/src/vnode/common/src/dataformat.c @@ -1,3 +1,32 @@ #include #include "dataformat.h" + +int32_t tdGetMaxDataRowSize(SSchema *pSchema) { + int32_t nbytes = 0; + + for (int32_t i = 0; i < TD_SCHEMA_NCOLS(pSchema); i++) + { + SColumn *pCol = TD_SCHEMA_COLUMN_AT(pSchema, i); + td_datatype_t type = TD_COLUMN_TYPE(pCol); + + nbytes += rowDataLen[type]; + + switch (type) + { + case TD_DATATYPE_VARCHAR: + nbytes += TD_COLUMN_BYTES(pCol); + break; + case TD_DATATYPE_NCHAR: + nbytes += 4 * TD_COLUMN_BYTES(pCol); + break; + case TD_DATATYPE_BINARY: + nbytes += TD_COLUMN_BYTES(pCol); + break; + } + } + + nbytes += TD_DATA_ROW_HEADER_SIZE; + + return nbytes; +} \ No newline at end of file diff --git a/src/vnode/tests/CMakeLists.txt b/src/vnode/tests/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..786fa9a66f8959ca6faa1278f59c903939218e05 --- /dev/null +++ b/src/vnode/tests/CMakeLists.txt @@ -0,0 +1,3 @@ +add_subdirectory(common) + +add_subdirectory(tsdb) \ No newline at end of file diff --git a/src/vnode/tests/common/CMakeLists.txt b/src/vnode/tests/common/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..093768be1a1398932d66825f2f2dcf4151ae1d65 --- /dev/null +++ b/src/vnode/tests/common/CMakeLists.txt @@ -0,0 +1,14 @@ +aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + +message(STATUS "COMMON: ${SOURCE_LIST}") + +add_executable(commonTests ${SOURCE_LIST}) + +target_link_libraries(commonTests gtest gtest_main pthread common) + +add_test( + NAME + unit + COMMAND + ${CMAKE_CURRENT_BINARY_DIR}/commonTests +) \ No newline at end of file diff --git a/src/vnode/tests/common/commonDataTests.cpp b/src/vnode/tests/common/commonDataTests.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7becb8699c117d50e19d4f5a8ed8944a82da794e --- /dev/null +++ b/src/vnode/tests/common/commonDataTests.cpp @@ -0,0 +1,7 @@ +#include + +#include "dataformat.h" + +TEST(commonDataTests, createDataRow) { + EXPECT_EQ(1, 2/2); +} \ No newline at end of file diff --git a/src/vnode/tests/common/commonSChemaTests.cpp b/src/vnode/tests/common/commonSChemaTests.cpp new file mode 100644 index 0000000000000000000000000000000000000000..44fd384b61c567487477b3c60e24630c4b4e935c --- /dev/null +++ b/src/vnode/tests/common/commonSChemaTests.cpp @@ -0,0 +1,9 @@ +#include +#include +#include + +#include "schema.h" + +TEST(commonSchemaTests, createSchema) { + EXPECT_EQ(1, 2/2); +} \ No newline at end of file diff --git a/src/vnode/tests/tsdb/CMakeLists.txt b/src/vnode/tests/tsdb/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..a942dd917e7cb52c9c41162dffe7206d7f16e69c --- /dev/null +++ b/src/vnode/tests/tsdb/CMakeLists.txt @@ -0,0 +1,13 @@ +aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + +message(STATUS "TSDB: ${SOURCE_LIST}") + +add_executable(tsdbTests ${SOURCE_LIST}) +target_link_libraries(tsdbTests gtest gtest_main pthread tsdb) + +add_test( + NAME + unit + COMMAND + ${CMAKE_CURRENT_BINARY_DIR}/tsdbTests +) \ No newline at end of file diff --git a/src/vnode/tests/tsdb/tsdbTests.cpp b/src/vnode/tests/tsdb/tsdbTests.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b3e6e22dada9035f841fe3211d70c026f72a39ef --- /dev/null +++ b/src/vnode/tests/tsdb/tsdbTests.cpp @@ -0,0 +1,15 @@ +#include +#include + +#include "tsdb.h" + +TEST(TsdbTest, createTsdbRepo) { + STSDBCfg *pCfg = (STSDBCfg *)malloc(sizeof(STSDBCfg)); + + pCfg->rootDir = "/var/lib/taos/"; + + int32_t err_num = 0; + + tsdb_repo_t *pRepo = tsdbCreateRepo(pCfg, &err_num); + ASSERT_EQ(pRepo, NULL); +} \ No newline at end of file diff --git a/src/vnode/tsdb/inc/tsdbCache.h b/src/vnode/tsdb/inc/tsdbCache.h index 049cdc0847d8da27f46fbf77d30d9d24fc3bc67a..b439be08aa34cfe3a0497458a66e10ca43c8138a 100644 --- a/src/vnode/tsdb/inc/tsdbCache.h +++ b/src/vnode/tsdb/inc/tsdbCache.h @@ -10,7 +10,7 @@ typedef struct { int64_t skey; // start key int64_t ekey; // end key - int32_t numOfRows // numOfRows + int32_t numOfRows; // numOfRows } STableCacheInfo; typedef struct _tsdb_cache_block { diff --git a/src/vnode/tsdb/src/tsdb.c b/src/vnode/tsdb/src/tsdb.c index 7e13e3183ad8ab33e9f975356359fd7d247f069c..031dd135c667403b5e500a34f729b221dbdf4eb6 100644 --- a/src/vnode/tsdb/src/tsdb.c +++ b/src/vnode/tsdb/src/tsdb.c @@ -1,9 +1,14 @@ #include #include #include +#include +#include +#include +#include // #include "taosdef.h" // #include "disk.h" +#include "tsdbFile.h" #include "tsdb.h" #include "tsdbCache.h" #include "tsdbMeta.h" @@ -33,6 +38,11 @@ typedef struct STSDBRepo { // Check the correctness of the TSDB configuration static int32_t tsdbCheckCfg(STSDBCfg *pCfg) { + if (pCfg->rootDir == NULL) return -1; + + if (access(pCfg->rootDir, F_OK|R_OK|W_OK) == -1) { + return -1; + } // TODO return 0; } @@ -42,6 +52,7 @@ tsdb_repo_t *tsdbCreateRepo(STSDBCfg *pCfg, int32_t *error) { err = tsdbCheckCfg(pCfg); if (err != 0) { // TODO: deal with the error here + return NULL; } STSDBRepo *pRepo = (STSDBRepo *)malloc(sizeof(STSDBRepo)); @@ -65,6 +76,12 @@ tsdb_repo_t *tsdbCreateRepo(STSDBCfg *pCfg, int32_t *error) { return NULL; } + // Create the Meta data file and data directory + + char *pTsdbMetaFName = tsdbGetFileName(pCfg->rootDir, "tsdb", TSDB_FILE_TYPE_META); + // int fd = open(pTsdbMetaFName, ) + // if (open) + return (tsdb_repo_t *)pRepo; }