提交 fb27589c 编写于 作者: J jtao1735

first draft

上级 0d09f6c0
...@@ -512,7 +512,7 @@ void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -512,7 +512,7 @@ void tscKillSTableQuery(SSqlObj *pSql) {
tscTrace("%p metric query is cancelled", pSql); tscTrace("%p metric query is cancelled", pSql);
} }
int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char *pMsg, *pStart; char *pMsg, *pStart;
pStart = pSql->cmd.payload + tsRpcHeadSize; pStart = pSql->cmd.payload + tsRpcHeadSize;
...@@ -541,7 +541,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -541,7 +541,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen); pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen);
pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE; pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1365,7 +1365,7 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1365,7 +1365,7 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE; pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE;
pCmd->payloadLen = sizeof(SRetrieveTableMsg); pCmd->payloadLen = sizeof(SRetrieveTableMsg);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
...@@ -2595,7 +2595,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { ...@@ -2595,7 +2595,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
void tscInitMsgsFp() { void tscInitMsgsFp() {
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg; tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg; tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg; tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg; tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg; tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
......
...@@ -23,9 +23,9 @@ char *taosMsg[] = { ...@@ -23,9 +23,9 @@ char *taosMsg[] = {
"query-rsp", "query-rsp",
"retrieve", "retrieve",
"retrieve-rsp", "retrieve-rsp",
"create-table", "create-table",
"create-table-rsp", //10 "create-table-rsp", //10
"drop-table", "drop-table",
"drop-table-rsp", "drop-table-rsp",
"alter-table", "alter-table",
...@@ -100,13 +100,13 @@ char *taosMsg[] = { ...@@ -100,13 +100,13 @@ char *taosMsg[] = {
"kill-stream-rsp", "kill-stream-rsp",
"kill-connection", "kill-connection",
"kill-connectoin-rsp", "kill-connectoin-rsp",
"config-dnode",
"config-dnode-rsp",
"retrieve",
"retrieve-rsp",
"heart-beat", "heart-beat",
"heart-beat-rsp", //80 "heart-beat-rsp", //84
"",
"",
"",
"",
"", "",
"", "",
"", "",
......
...@@ -95,7 +95,7 @@ void dnodeRead(SRpcMsg *pMsg) { ...@@ -95,7 +95,7 @@ void dnodeRead(SRpcMsg *pMsg) {
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) { if (pMsg->msgType == TSDB_MSG_TYPE_FETCH) {
pVnode = vnodeGetVnode(pHead->vgId); pVnode = vnodeGetVnode(pHead->vgId);
} else { } else {
pVnode = vnodeAccquireVnode(pHead->vgId); pVnode = vnodeAccquireVnode(pHead->vgId);
......
...@@ -35,9 +35,9 @@ static int32_t tsDnodeQueryReqNum = 0; ...@@ -35,9 +35,9 @@ static int32_t tsDnodeQueryReqNum = 0;
static int32_t tsDnodeSubmitReqNum = 0; static int32_t tsDnodeSubmitReqNum = 0;
int32_t dnodeInitShell() { int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeWrite; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeWrite;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeRead; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeRead;
int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0); numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0);
......
...@@ -28,14 +28,12 @@ extern "C" { ...@@ -28,14 +28,12 @@ extern "C" {
#include "trpc.h" #include "trpc.h"
// message type // message type
#define TSDB_MSG_TYPE_REG 1
#define TSDB_MSG_TYPE_REG_RSP 2
#define TSDB_MSG_TYPE_SUBMIT 3 #define TSDB_MSG_TYPE_SUBMIT 3
#define TSDB_MSG_TYPE_SUBMIT_RSP 4 #define TSDB_MSG_TYPE_SUBMIT_RSP 4
#define TSDB_MSG_TYPE_QUERY 5 #define TSDB_MSG_TYPE_QUERY 5
#define TSDB_MSG_TYPE_QUERY_RSP 6 #define TSDB_MSG_TYPE_QUERY_RSP 6
#define TSDB_MSG_TYPE_RETRIEVE 7 #define TSDB_MSG_TYPE_FETCH 7
#define TSDB_MSG_TYPE_RETRIEVE_RSP 8 #define TSDB_MSG_TYPE_FETCH_RSP 8
// message from mnode to dnode // message from mnode to dnode
#define TSDB_MSG_TYPE_MD_CREATE_TABLE 9 #define TSDB_MSG_TYPE_MD_CREATE_TABLE 9
...@@ -74,8 +72,6 @@ extern "C" { ...@@ -74,8 +72,6 @@ extern "C" {
#define TSDB_MSG_TYPE_CM_CREATE_DNODE_RSP 46 #define TSDB_MSG_TYPE_CM_CREATE_DNODE_RSP 46
#define TSDB_MSG_TYPE_CM_DROP_DNODE 47 #define TSDB_MSG_TYPE_CM_DROP_DNODE 47
#define TSDB_MSG_TYPE_CM_DROP_DNODE_RSP 48 #define TSDB_MSG_TYPE_CM_DROP_DNODE_RSP 48
#define TSDB_MSG_TYPE_CM_CONFIG_DNODE TSDB_MSG_TYPE_MD_CONFIG_DNODE
#define TSDB_MSG_TYPE_CM_CONFIG_DNODE_RSP TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP
#define TSDB_MSG_TYPE_CM_CREATE_DB 49 #define TSDB_MSG_TYPE_CM_CREATE_DB 49
#define TSDB_MSG_TYPE_CM_CREATE_DB_RSP 50 #define TSDB_MSG_TYPE_CM_CREATE_DB_RSP 50
#define TSDB_MSG_TYPE_CM_DROP_DB 51 #define TSDB_MSG_TYPE_CM_DROP_DB 51
...@@ -103,11 +99,15 @@ extern "C" { ...@@ -103,11 +99,15 @@ extern "C" {
#define TSDB_MSG_TYPE_CM_KILL_QUERY 73 #define TSDB_MSG_TYPE_CM_KILL_QUERY 73
#define TSDB_MSG_TYPE_CM_KILL_QUERY_RSP 74 #define TSDB_MSG_TYPE_CM_KILL_QUERY_RSP 74
#define TSDB_MSG_TYPE_CM_KILL_STREAM 75 #define TSDB_MSG_TYPE_CM_KILL_STREAM 75
#define TSDB_MSG_TYPE_CM_KILL_STREAM_RSP 76 #define TSDB_MSG_TYPE_CM_KILL_STREAM_RSP 76
#define TSDB_MSG_TYPE_CM_KILL_CONN 77 #define TSDB_MSG_TYPE_CM_KILL_CONN 77
#define TSDB_MSG_TYPE_CM_KILL_CONN_RSP 78 #define TSDB_MSG_TYPE_CM_KILL_CONN_RSP 78
#define TSDB_MSG_TYPE_CM_HEARTBEAT 79 #define TSDB_MSG_TYPE_CM_CONFIG_DNODE 79
#define TSDB_MSG_TYPE_CM_HEARTBEAT_RSP 80 #define TSDB_MSG_TYPE_CM_CONFIG_DNODE_RSP 80
#define TSDB_MSG_TYPE_CM_RETRIEVE 81
#define TSDB_MSG_TYPE_CM_RETRIEVE_RSP 82
#define TSDB_MSG_TYPE_CM_HEARTBEAT 83
#define TSDB_MSG_TYPE_CM_HEARTBEAT_RSP 84
// message from dnode to mnode // message from dnode to mnode
#define TSDB_MSG_TYPE_DM_CONFIG_TABLE 91 #define TSDB_MSG_TYPE_DM_CONFIG_TABLE 91
......
...@@ -61,7 +61,7 @@ static SShowRetrieveFp tsMgmtShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0}; ...@@ -61,7 +61,7 @@ static SShowRetrieveFp tsMgmtShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0};
int32_t mgmtInitShell() { int32_t mgmtInitShell() {
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_SHOW, mgmtProcessShowMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_SHOW, mgmtProcessShowMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_RETRIEVE, mgmtProcessRetrieveMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_RETRIEVE, mgmtProcessRetrieveMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_HEARTBEAT, mgmtProcessHeartBeatMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_HEARTBEAT, mgmtProcessHeartBeatMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mgmtProcessConnectMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mgmtProcessConnectMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mgmtProcessUseMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mgmtProcessUseMsg);
...@@ -490,7 +490,7 @@ static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg) { ...@@ -490,7 +490,7 @@ static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg) {
return mgmtCheckTableMetaMsgReadOnly(pMsg); return mgmtCheckTableMetaMsgReadOnly(pMsg);
} }
if (pMsg->msgType == TSDB_MSG_TYPE_CM_STABLE_VGROUP || pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE || if (pMsg->msgType == TSDB_MSG_TYPE_CM_STABLE_VGROUP || pMsg->msgType == TSDB_MSG_TYPE_CM_RETRIEVE ||
pMsg->msgType == TSDB_MSG_TYPE_CM_SHOW || pMsg->msgType == TSDB_MSG_TYPE_CM_TABLES_META || pMsg->msgType == TSDB_MSG_TYPE_CM_SHOW || pMsg->msgType == TSDB_MSG_TYPE_CM_TABLES_META ||
pMsg->msgType == TSDB_MSG_TYPE_CM_CONNECT) { pMsg->msgType == TSDB_MSG_TYPE_CM_CONNECT) {
return true; return true;
......
...@@ -363,7 +363,7 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg) ...@@ -363,7 +363,7 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg)
// connection type is application specific. // connection type is application specific.
// for TDengine, all the query, show commands shall have TCP connection // for TDengine, all the query, show commands shall have TCP connection
char type = pMsg->msgType; char type = pMsg->msgType;
if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_RETRIEVE || if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_CM_RETRIEVE || type == TSDB_MSG_TYPE_FETCH ||
type == TSDB_MSG_TYPE_CM_STABLE_VGROUP || type == TSDB_MSG_TYPE_CM_TABLES_META || type == TSDB_MSG_TYPE_CM_STABLE_VGROUP || type == TSDB_MSG_TYPE_CM_TABLES_META ||
type == TSDB_MSG_TYPE_CM_SHOW ) type == TSDB_MSG_TYPE_CM_SHOW )
pContext->connType = RPC_CONN_TCPC; pContext->connType = RPC_CONN_TCPC;
...@@ -802,7 +802,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -802,7 +802,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
if (terrno == 0) { if (terrno == 0) {
if (pHead->msgType != TSDB_MSG_TYPE_REG && pHead->encrypt) { if (pHead->encrypt) {
// decrypt here // decrypt here
} }
......
...@@ -29,11 +29,11 @@ ...@@ -29,11 +29,11 @@
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet); static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet);
static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet);
void vnodeInitReadFp(void) { void vnodeInitReadFp(void) {
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = vnodeProcessRetrieveMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
} }
int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) { int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) {
...@@ -76,7 +76,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont ...@@ -76,7 +76,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
return code; return code;
} }
static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) { static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) {
SRetrieveTableMsg *pRetrieve = pCont; SRetrieveTableMsg *pRetrieve = pCont;
void *pQInfo = (void*) htobe64(pRetrieve->qhandle); void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
memset(pRet, 0, sizeof(SRspRet)); memset(pRet, 0, sizeof(SRspRet));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册