diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d6f1c8f42abdef4b71fd979dc3e4d022978d7e5f..2bef303df5172a8887aa4ca873d7cecc3118750c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -512,7 +512,7 @@ void tscKillSTableQuery(SSqlObj *pSql) { tscTrace("%p metric query is cancelled", pSql); } -int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { +int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { char *pMsg, *pStart; pStart = pSql->cmd.payload + tsRpcHeadSize; @@ -541,7 +541,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen); - pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE; + pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH; return TSDB_CODE_SUCCESS; } @@ -1365,7 +1365,7 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; - pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE; + pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE; pCmd->payloadLen = sizeof(SRetrieveTableMsg); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { @@ -2595,7 +2595,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { void tscInitMsgsFp() { tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg; tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg; - tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg; + tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg; tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg; tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg; diff --git a/src/common/src/tmessage.c b/src/common/src/tmessage.c index 5a134df12904dad5e97ed132622db6463a11e831..78ce17db75828536b6da5815400a46494134f56f 100644 --- a/src/common/src/tmessage.c +++ b/src/common/src/tmessage.c @@ -23,9 +23,9 @@ char *taosMsg[] = { "query-rsp", "retrieve", "retrieve-rsp", + "create-table", "create-table-rsp", //10 - "drop-table", "drop-table-rsp", "alter-table", @@ -100,13 +100,13 @@ char *taosMsg[] = { "kill-stream-rsp", "kill-connection", "kill-connectoin-rsp", + "config-dnode", + "config-dnode-rsp", + "retrieve", + "retrieve-rsp", "heart-beat", - "heart-beat-rsp", //80 + "heart-beat-rsp", //84 - "", - "", - "", - "", "", "", "", diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index e52b59d20a38ad9242eb7103885b4e6a5c7691b5..42c3b1faa020c1fc7fe5b2527bf81cd458cc10df 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -95,7 +95,7 @@ void dnodeRead(SRpcMsg *pMsg) { pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); - if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) { + if (pMsg->msgType == TSDB_MSG_TYPE_FETCH) { pVnode = vnodeGetVnode(pHead->vgId); } else { pVnode = vnodeAccquireVnode(pHead->vgId); diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 544d443bc0417c110f8c89b81c787119a6138206..0c9716ca52cc48edab7a5bb37b68aa15b2b35dc7 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -35,9 +35,9 @@ static int32_t tsDnodeQueryReqNum = 0; static int32_t tsDnodeSubmitReqNum = 0; int32_t dnodeInitShell() { - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeWrite; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeRead; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeWrite; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeRead; int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index fd3105e3bb9a6ed0189816ec9dc463d4b4d09337..dd9c8e1a52d9599ae80085dd7f3dc3ce15d0e5e1 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -28,14 +28,12 @@ extern "C" { #include "trpc.h" // message type -#define TSDB_MSG_TYPE_REG 1 -#define TSDB_MSG_TYPE_REG_RSP 2 #define TSDB_MSG_TYPE_SUBMIT 3 #define TSDB_MSG_TYPE_SUBMIT_RSP 4 #define TSDB_MSG_TYPE_QUERY 5 #define TSDB_MSG_TYPE_QUERY_RSP 6 -#define TSDB_MSG_TYPE_RETRIEVE 7 -#define TSDB_MSG_TYPE_RETRIEVE_RSP 8 +#define TSDB_MSG_TYPE_FETCH 7 +#define TSDB_MSG_TYPE_FETCH_RSP 8 // message from mnode to dnode #define TSDB_MSG_TYPE_MD_CREATE_TABLE 9 @@ -74,8 +72,6 @@ extern "C" { #define TSDB_MSG_TYPE_CM_CREATE_DNODE_RSP 46 #define TSDB_MSG_TYPE_CM_DROP_DNODE 47 #define TSDB_MSG_TYPE_CM_DROP_DNODE_RSP 48 -#define TSDB_MSG_TYPE_CM_CONFIG_DNODE TSDB_MSG_TYPE_MD_CONFIG_DNODE -#define TSDB_MSG_TYPE_CM_CONFIG_DNODE_RSP TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP #define TSDB_MSG_TYPE_CM_CREATE_DB 49 #define TSDB_MSG_TYPE_CM_CREATE_DB_RSP 50 #define TSDB_MSG_TYPE_CM_DROP_DB 51 @@ -103,11 +99,15 @@ extern "C" { #define TSDB_MSG_TYPE_CM_KILL_QUERY 73 #define TSDB_MSG_TYPE_CM_KILL_QUERY_RSP 74 #define TSDB_MSG_TYPE_CM_KILL_STREAM 75 -#define TSDB_MSG_TYPE_CM_KILL_STREAM_RSP 76 -#define TSDB_MSG_TYPE_CM_KILL_CONN 77 -#define TSDB_MSG_TYPE_CM_KILL_CONN_RSP 78 -#define TSDB_MSG_TYPE_CM_HEARTBEAT 79 -#define TSDB_MSG_TYPE_CM_HEARTBEAT_RSP 80 +#define TSDB_MSG_TYPE_CM_KILL_STREAM_RSP 76 +#define TSDB_MSG_TYPE_CM_KILL_CONN 77 +#define TSDB_MSG_TYPE_CM_KILL_CONN_RSP 78 +#define TSDB_MSG_TYPE_CM_CONFIG_DNODE 79 +#define TSDB_MSG_TYPE_CM_CONFIG_DNODE_RSP 80 +#define TSDB_MSG_TYPE_CM_RETRIEVE 81 +#define TSDB_MSG_TYPE_CM_RETRIEVE_RSP 82 +#define TSDB_MSG_TYPE_CM_HEARTBEAT 83 +#define TSDB_MSG_TYPE_CM_HEARTBEAT_RSP 84 // message from dnode to mnode #define TSDB_MSG_TYPE_DM_CONFIG_TABLE 91 diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index c86bf2a2dd1334238e632792ff4bb05ee9c0b2f6..b09cb5aff25510b94359f66b61cc7c781189cb2e 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -61,7 +61,7 @@ static SShowRetrieveFp tsMgmtShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0}; int32_t mgmtInitShell() { mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_SHOW, mgmtProcessShowMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_RETRIEVE, mgmtProcessRetrieveMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_RETRIEVE, mgmtProcessRetrieveMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_HEARTBEAT, mgmtProcessHeartBeatMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mgmtProcessConnectMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mgmtProcessUseMsg); @@ -490,7 +490,7 @@ static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg) { return mgmtCheckTableMetaMsgReadOnly(pMsg); } - if (pMsg->msgType == TSDB_MSG_TYPE_CM_STABLE_VGROUP || pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE || + if (pMsg->msgType == TSDB_MSG_TYPE_CM_STABLE_VGROUP || pMsg->msgType == TSDB_MSG_TYPE_CM_RETRIEVE || pMsg->msgType == TSDB_MSG_TYPE_CM_SHOW || pMsg->msgType == TSDB_MSG_TYPE_CM_TABLES_META || pMsg->msgType == TSDB_MSG_TYPE_CM_CONNECT) { return true; diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 36d74189f5045c7a7d8321ca5c37844e5dbc7aa2..a02ee67b0ad9eef5396399b7889046e80bf5f538 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -363,7 +363,7 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg) // connection type is application specific. // for TDengine, all the query, show commands shall have TCP connection char type = pMsg->msgType; - if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_RETRIEVE || + if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_CM_RETRIEVE || type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_CM_STABLE_VGROUP || type == TSDB_MSG_TYPE_CM_TABLES_META || type == TSDB_MSG_TYPE_CM_SHOW ) pContext->connType = RPC_CONN_TCPC; @@ -802,7 +802,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { pHead->code = htonl(pHead->code); if (terrno == 0) { - if (pHead->msgType != TSDB_MSG_TYPE_REG && pHead->encrypt) { + if (pHead->encrypt) { // decrypt here } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 201214ded46c3a0f7da349a89fa5324dbe6eac0a..02a4e688e25bd8972dc998a6e7b86798aa1644cd 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -29,11 +29,11 @@ static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); -static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); +static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); void vnodeInitReadFp(void) { - vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; - vnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = vnodeProcessRetrieveMsg; + vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; + vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; } int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) { @@ -76,7 +76,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont return code; } -static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) { +static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) { SRetrieveTableMsg *pRetrieve = pCont; void *pQInfo = (void*) htobe64(pRetrieve->qhandle); memset(pRet, 0, sizeof(SRspRet));