From fb27589c139d36859f945db0f9103cdd1d872fe4 Mon Sep 17 00:00:00 2001 From: jtao1735 Date: Mon, 4 May 2020 04:55:27 +0000 Subject: [PATCH] first draft --- src/client/src/tscServer.c | 8 ++++---- src/common/src/tmessage.c | 12 ++++++------ src/dnode/src/dnodeRead.c | 2 +- src/dnode/src/dnodeShell.c | 6 +++--- src/inc/taosmsg.h | 22 +++++++++++----------- src/mnode/src/mgmtShell.c | 4 ++-- src/rpc/src/rpcMain.c | 4 ++-- src/vnode/src/vnodeRead.c | 8 ++++---- 8 files changed, 33 insertions(+), 33 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d6f1c8f42a..2bef303df5 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -512,7 +512,7 @@ void tscKillSTableQuery(SSqlObj *pSql) { tscTrace("%p metric query is cancelled", pSql); } -int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { +int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { char *pMsg, *pStart; pStart = pSql->cmd.payload + tsRpcHeadSize; @@ -541,7 +541,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen); - pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE; + pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH; return TSDB_CODE_SUCCESS; } @@ -1365,7 +1365,7 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; - pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE; + pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE; pCmd->payloadLen = sizeof(SRetrieveTableMsg); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { @@ -2595,7 +2595,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { void tscInitMsgsFp() { tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg; tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg; - tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg; + tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg; tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg; tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg; diff --git a/src/common/src/tmessage.c b/src/common/src/tmessage.c index 5a134df129..78ce17db75 100644 --- a/src/common/src/tmessage.c +++ b/src/common/src/tmessage.c @@ -23,9 +23,9 @@ char *taosMsg[] = { "query-rsp", "retrieve", "retrieve-rsp", + "create-table", "create-table-rsp", //10 - "drop-table", "drop-table-rsp", "alter-table", @@ -100,13 +100,13 @@ char *taosMsg[] = { "kill-stream-rsp", "kill-connection", "kill-connectoin-rsp", + "config-dnode", + "config-dnode-rsp", + "retrieve", + "retrieve-rsp", "heart-beat", - "heart-beat-rsp", //80 + "heart-beat-rsp", //84 - "", - "", - "", - "", "", "", "", diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index e52b59d20a..42c3b1faa0 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -95,7 +95,7 @@ void dnodeRead(SRpcMsg *pMsg) { pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); - if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) { + if (pMsg->msgType == TSDB_MSG_TYPE_FETCH) { pVnode = vnodeGetVnode(pHead->vgId); } else { pVnode = vnodeAccquireVnode(pHead->vgId); diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 544d443bc0..0c9716ca52 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -35,9 +35,9 @@ static int32_t tsDnodeQueryReqNum = 0; static int32_t tsDnodeSubmitReqNum = 0; int32_t dnodeInitShell() { - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeWrite; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeRead; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeWrite; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeRead; int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index fd3105e3bb..dd9c8e1a52 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -28,14 +28,12 @@ extern "C" { #include "trpc.h" // message type -#define TSDB_MSG_TYPE_REG 1 -#define TSDB_MSG_TYPE_REG_RSP 2 #define TSDB_MSG_TYPE_SUBMIT 3 #define TSDB_MSG_TYPE_SUBMIT_RSP 4 #define TSDB_MSG_TYPE_QUERY 5 #define TSDB_MSG_TYPE_QUERY_RSP 6 -#define TSDB_MSG_TYPE_RETRIEVE 7 -#define TSDB_MSG_TYPE_RETRIEVE_RSP 8 +#define TSDB_MSG_TYPE_FETCH 7 +#define TSDB_MSG_TYPE_FETCH_RSP 8 // message from mnode to dnode #define TSDB_MSG_TYPE_MD_CREATE_TABLE 9 @@ -74,8 +72,6 @@ extern "C" { #define TSDB_MSG_TYPE_CM_CREATE_DNODE_RSP 46 #define TSDB_MSG_TYPE_CM_DROP_DNODE 47 #define TSDB_MSG_TYPE_CM_DROP_DNODE_RSP 48 -#define TSDB_MSG_TYPE_CM_CONFIG_DNODE TSDB_MSG_TYPE_MD_CONFIG_DNODE -#define TSDB_MSG_TYPE_CM_CONFIG_DNODE_RSP TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP #define TSDB_MSG_TYPE_CM_CREATE_DB 49 #define TSDB_MSG_TYPE_CM_CREATE_DB_RSP 50 #define TSDB_MSG_TYPE_CM_DROP_DB 51 @@ -103,11 +99,15 @@ extern "C" { #define TSDB_MSG_TYPE_CM_KILL_QUERY 73 #define TSDB_MSG_TYPE_CM_KILL_QUERY_RSP 74 #define TSDB_MSG_TYPE_CM_KILL_STREAM 75 -#define TSDB_MSG_TYPE_CM_KILL_STREAM_RSP 76 -#define TSDB_MSG_TYPE_CM_KILL_CONN 77 -#define TSDB_MSG_TYPE_CM_KILL_CONN_RSP 78 -#define TSDB_MSG_TYPE_CM_HEARTBEAT 79 -#define TSDB_MSG_TYPE_CM_HEARTBEAT_RSP 80 +#define TSDB_MSG_TYPE_CM_KILL_STREAM_RSP 76 +#define TSDB_MSG_TYPE_CM_KILL_CONN 77 +#define TSDB_MSG_TYPE_CM_KILL_CONN_RSP 78 +#define TSDB_MSG_TYPE_CM_CONFIG_DNODE 79 +#define TSDB_MSG_TYPE_CM_CONFIG_DNODE_RSP 80 +#define TSDB_MSG_TYPE_CM_RETRIEVE 81 +#define TSDB_MSG_TYPE_CM_RETRIEVE_RSP 82 +#define TSDB_MSG_TYPE_CM_HEARTBEAT 83 +#define TSDB_MSG_TYPE_CM_HEARTBEAT_RSP 84 // message from dnode to mnode #define TSDB_MSG_TYPE_DM_CONFIG_TABLE 91 diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index c86bf2a2dd..b09cb5aff2 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -61,7 +61,7 @@ static SShowRetrieveFp tsMgmtShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0}; int32_t mgmtInitShell() { mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_SHOW, mgmtProcessShowMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_RETRIEVE, mgmtProcessRetrieveMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_RETRIEVE, mgmtProcessRetrieveMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_HEARTBEAT, mgmtProcessHeartBeatMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mgmtProcessConnectMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mgmtProcessUseMsg); @@ -490,7 +490,7 @@ static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg) { return mgmtCheckTableMetaMsgReadOnly(pMsg); } - if (pMsg->msgType == TSDB_MSG_TYPE_CM_STABLE_VGROUP || pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE || + if (pMsg->msgType == TSDB_MSG_TYPE_CM_STABLE_VGROUP || pMsg->msgType == TSDB_MSG_TYPE_CM_RETRIEVE || pMsg->msgType == TSDB_MSG_TYPE_CM_SHOW || pMsg->msgType == TSDB_MSG_TYPE_CM_TABLES_META || pMsg->msgType == TSDB_MSG_TYPE_CM_CONNECT) { return true; diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 36d74189f5..a02ee67b0a 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -363,7 +363,7 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg) // connection type is application specific. // for TDengine, all the query, show commands shall have TCP connection char type = pMsg->msgType; - if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_RETRIEVE || + if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_CM_RETRIEVE || type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_CM_STABLE_VGROUP || type == TSDB_MSG_TYPE_CM_TABLES_META || type == TSDB_MSG_TYPE_CM_SHOW ) pContext->connType = RPC_CONN_TCPC; @@ -802,7 +802,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { pHead->code = htonl(pHead->code); if (terrno == 0) { - if (pHead->msgType != TSDB_MSG_TYPE_REG && pHead->encrypt) { + if (pHead->encrypt) { // decrypt here } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 201214ded4..02a4e688e2 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -29,11 +29,11 @@ static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); -static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); +static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); void vnodeInitReadFp(void) { - vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; - vnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = vnodeProcessRetrieveMsg; + vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; + vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; } int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) { @@ -76,7 +76,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont return code; } -static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) { +static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) { SRetrieveTableMsg *pRetrieve = pCont; void *pQInfo = (void*) htobe64(pRetrieve->qhandle); memset(pRet, 0, sizeof(SRspRet)); -- GitLab