提交 d1a4fa63 编写于 作者: H hzcheng

Merge branch '2.0' of https://github.com/taosdata/TDengine into 2.0

......@@ -545,7 +545,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
* There is not response callback function for submit response.
* The actual inserted number of points is the first number.
*/
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP) {
if (pMsg->msgType == TSDB_MSG_TYPE_DNODE_SUBMIT_RSP) {
pRes->numOfRows += *(int32_t *)pRes->pRsp;
tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code,
......@@ -1464,7 +1464,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += sizeof(pQueryInfo->type);
pSql->cmd.payloadLen = pMsg - pStart;
pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE;
pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_RETRIEVE;
return TSDB_CODE_SUCCESS;
}
......@@ -1503,7 +1503,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
// pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_SUBMIT;
tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip),
htons(pShellMsg->vnode));
......@@ -1900,7 +1900,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
pCmd->payloadLen = msgLen;
pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_QUERY;
assert(msgLen + minMsgSize() <= size);
......@@ -2041,7 +2041,7 @@ int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += sizeof(SCfgMsg);
pCmd->payloadLen = pMsg - pStart;
pCmd->msgType = TSDB_MSG_TYPE_CFG_PNODE;
pCmd->msgType = TSDB_MSG_TYPE_DNODE_CFG;
return TSDB_CODE_SUCCESS;
}
......@@ -2480,7 +2480,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
msgLen = pMsg - pStart;
pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE;
pCmd->msgType = TSDB_MSG_TYPE_DNODE_RETRIEVE;
return TSDB_CODE_SUCCESS;
}
......@@ -2660,7 +2660,7 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
msgLen = pMsg - pStart;
pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_METERINFO;
pCmd->msgType = TSDB_MSG_TYPE_TABLE_META;
tfree(tmpData);
......@@ -2698,7 +2698,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tfree(tmpData);
pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiMeterInfoMsg);
pCmd->msgType = TSDB_MSG_TYPE_MULTI_METERINFO;
pCmd->msgType = TSDB_MSG_TYPE_MULTI_TABLE_META;
assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);
......@@ -2866,7 +2866,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
msgLen = pMsg - pStart;
pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_METRIC_META;
pCmd->msgType = TSDB_MSG_TYPE_STABLE_META;
assert(msgLen + minMsgSize() <= size);
return TSDB_CODE_SUCCESS;
......
......@@ -118,7 +118,7 @@ bool tscQueryOnMetric(SSqlCmd* pCmd) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
return ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_QUERY) == TSDB_QUERY_TYPE_STABLE_QUERY) &&
(pCmd->msgType == TSDB_MSG_TYPE_QUERY);
(pCmd->msgType == TSDB_MSG_TYPE_DNODE_QUERY);
}
bool tscQueryMetricTags(SQueryInfo* pQueryInfo) {
......
......@@ -25,10 +25,10 @@ extern "C" {
#include "tsched.h"
#include "dnode.h"
int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
int vnodeProcessRemoveMeterRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
int dnodeProcessCreateTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
int dnodeProcessRemoveTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
void dnodeDistributeMsgFromMgmt(char *content, int msgLen, int msgType, SMgmtObj *pObj);
void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
extern void *dmQhandle;
......
......@@ -24,19 +24,37 @@ extern "C" {
#include <stdint.h>
#include "taosdef.h"
#include "taosmsg.h"
#include "dnodeShell.h"
void dnodeFreeQInfoInQueue(SShellObj *pShellObj);
/*
* Clear query information associated with this connection
*/
void dnodeFreeQInfo(void *pConn);
/*
* Clear all query informations
*/
void dnodeFreeQInfos();
/*
* handle query message, and the result is returned by callback function
*/
void dnodeQueryData(SQueryMeterMsg *pQuery, void *pConn, void (*callback)(int32_t code, void *pQInfo, void *pConn));
/*
* Dnode handle read messages
* The processing result is returned by callback function with pShellObj parameter
* Dispose retrieve msg, and the result will passed through callback function
*/
int32_t dnodeReadData(SQueryMeterMsg *msg, void *pShellObj, void (*callback)(SQueryMeterRsp *rspMsg, void *pShellObj));
typedef void (*SDnodeRetrieveCallbackFp)(int32_t code, void *pQInfo, void *pConn);
void dnodeRetrieveData(SRetrieveMeterMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp);
typedef void (*SDnodeRetrieveCallbackFp)(int32_t code, SRetrieveMeterRsp *pRetrieveRspMsg, void *pShellObj);
/*
* Fill retrieve result according to query info
*/
int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveMeterRsp *retrievalRsp);
void dnodeRetrieveData(SRetrieveMeterMsg *pMsg, int32_t msgLen, void *pShellObj, SDnodeRetrieveCallbackFp callback);
/*
* Get the size of retrieve result according to query info
*/
int32_t dnodeGetRetrieveDataSize(void *pQInfo);
#ifdef __cplusplus
}
......
......@@ -27,17 +27,12 @@ extern "C" {
/*
* Write data based on dnode, the detail result can be fetched from rsponse
* pSubmitMsg: Data to be written
* pShellObj: Used to pass a communication handle
* callback: Pass the write result through a callback function, possibly in a different thread space
* rsp: will not be freed by callback function
* pSubmit: Data to be written
* pConn: Communication handle
* callback: Pass the write result through a callback function, possibly in a different thread space
* rsp: will not be freed by callback function
*/
void dnodeWriteData(SShellSubmitMsg *pMsg, void *pShellObj, void (*callback)(SShellSubmitRspMsg *rsp, void *pShellObj));
/*
* Check if table already exists
*/
int32_t dnodeCheckTableExist(char *tableId);
void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn));
/*
* Create noraml table with specified configuration and open it
......
......@@ -22,28 +22,28 @@
#include "dnodeMgmt.h"
#include "taosmsg.h"
#include "tlog.h"
#include "trpc.h"
#include "tsched.h"
#include "tsystem.h"
#include "vnode.h"
#include "vnodeSystem.h"
#include "vnodeUtil.h"
#include "vnodeStatus.h"
SMgmtObj mgmtObj;
extern uint64_t tsCreatedTime;
int vnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj);
int dnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj);
int vnodeProcessCreateMeterMsg(char *pMsg, int msgLen);
int vnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
int vnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj);
int vnodeProcessMeterCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj);
int vnodeProcessCfgDnodeRequest(char *cont, int contLen, SMgmtObj *pMgmtObj);
int vnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj);
int dnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
int dnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj);
int dnodeProcessTableCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj);
int dnodeProcessDnodeCfgRequest(char *cont, int contLen, SMgmtObj *pMgmtObj);
int dnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj);
void vnodeUpdateHeadFile(int vnode, int oldTables, int newTables);
void vnodeOpenVnode(int vnode);
void vnodeCleanUpOneVnode(int vnode);
static int (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, void *pConn);
static void dnodeInitProcessShellMsg();
char *taosBuildRspMsgToMnodeWithSizeImp(SMgmtObj *pObj, char type, int size) {
char *pStart = (char *)malloc(size);
if (pStart == NULL) {
......@@ -106,48 +106,40 @@ int taosSendSimpleRspToMnodeImp(SMgmtObj *pObj, char rsptype, char code) {
}
int (*taosSendSimpleRspToMnode)(SMgmtObj *pObj, char rsptype, char code) = taosSendSimpleRspToMnodeImp;
int32_t dnodeInitMgmtImp() { return 0; }
int32_t dnodeInitMgmtImp() {
dnodeInitProcessShellMsg();
return 0;
}
int32_t (*dnodeInitMgmt)() = dnodeInitMgmtImp;
void dnodeInitMgmtIpImp() {}
void (*dnodeInitMgmtIp)() = dnodeInitMgmtIpImp;
void dnodeProcessMsgFromMgmtImp(SSchedMsg *sched) {
char msgType = *sched->msg;
char *content = sched->msg + 1;
int32_t msgType = *(int32_t*)(sched->msg);
int8_t *content = sched->msg + sizeof(int32_t);
dTrace("msg:%s is received from mgmt", taosMsg[(uint8_t)msgType]);
dnodeDistributeMsgFromMgmt(content, 0, msgType, 0);
dTrace("msg:%s is received from mgmt", taosMsg[msgType]);
dnodeDistributeMsgFromMgmt(content, 0, msgType, NULL);
free(sched->msg);
}
void dnodeDistributeMsgFromMgmt(char *content, int msgLen, int msgType, SMgmtObj *pObj) {
if (msgType == TSDB_MSG_TYPE_CREATE) {
vnodeProcessCreateMeterRequest(content, msgLen, pObj);
} else if (msgType == TSDB_MSG_TYPE_VPEERS) {
vnodeProcessVPeersMsg(content, msgLen, pObj);
} else if (msgType == TSDB_MSG_TYPE_VPEER_CFG_RSP) {
vnodeProcessVPeerCfgRsp(content, msgLen, pObj);
} else if (msgType == TSDB_MSG_TYPE_METER_CFG_RSP) {
vnodeProcessMeterCfgRsp(content, msgLen, pObj);
} else if (msgType == TSDB_MSG_TYPE_REMOVE) {
vnodeProcessRemoveMeterRequest(content, msgLen, pObj);
} else if (msgType == TSDB_MSG_TYPE_FREE_VNODE) {
vnodeProcessFreeVnodeRequest(content, msgLen, pObj);
} else if (msgType == TSDB_MSG_TYPE_CFG_PNODE) {
vnodeProcessCfgDnodeRequest(content, msgLen, pObj);
} else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM) {
vnodeProcessAlterStreamRequest(content, msgLen, pObj);
} else if (msgType == TSDB_MSG_TYPE_GRANT_RSP) {
// do nothing
void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn) {
if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
dError("invalid msg type:%d", msgType);
} else {
dError("%s is not processed", taosMsg[msgType]);
if (dnodeProcessShellMsgFp[msgType]) {
(*dnodeProcessShellMsgFp[msgType])(pConn, contLen, pConn);
} else {
dError("%s is not processed", taosMsg[msgType]);
}
}
}
int vnodeProcessMeterCfgRsp(char *pMsg, int msgLen, SMgmtObj *pObj) {
int dnodeProcessTableCfgRsp(char *pMsg, int msgLen, SMgmtObj *pObj) {
int code = *pMsg;
if (code == 0) {
......@@ -166,7 +158,7 @@ int vnodeProcessMeterCfgRsp(char *pMsg, int msgLen, SMgmtObj *pObj) {
return 0;
}
int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pObj) {
int dnodeProcessCreateTableRequest(char *pMsg, int msgLen, SMgmtObj *pObj) {
SCreateMsg *pCreate;
int code = 0;
int vid;
......@@ -196,12 +188,12 @@ int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pObj) {
// }
_over:
taosSendSimpleRspToMnode(pObj, TSDB_MSG_TYPE_CREATE_RSP, code);
taosSendSimpleRspToMnode(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE_RSP, code);
return code;
}
int vnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj) {
int dnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj) {
SAlterStreamMsg *pAlter;
int code = 0;
int vid, sid;
......@@ -356,7 +348,7 @@ _create_over:
return code;
}
int vnodeProcessRemoveMeterRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) {
int dnodeProcessRemoveTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) {
SMeterObj * pObj;
SRemoveMeterMsg *pRemove;
int code = 0;
......@@ -476,7 +468,7 @@ int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
return 0;
}
int vnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
int dnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
STaosRsp *pRsp;
pRsp = (STaosRsp *)msg;
......@@ -497,7 +489,7 @@ int vnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
return 0;
}
int vnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
int dnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
int code = 0;
code = vnodeProcessVPeerCfg(msg, msgLen, pMgmtObj);
......@@ -506,7 +498,7 @@ int vnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
STaosRsp * pRsp;
SVPeersMsg *pVPeersMsg = (SVPeersMsg *)msg;
pStart = taosBuildRspMsgToMnode(pMgmtObj, TSDB_MSG_TYPE_VPEERS_RSP);
pStart = taosBuildRspMsgToMnode(pMgmtObj, TSDB_MSG_TYPE_DNODE_VPEERS_RSP);
if (pStart == NULL) return -1;
pRsp = (STaosRsp *)pStart;
......@@ -519,7 +511,7 @@ int vnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
return code;
}
int vnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) {
int dnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) {
SFreeVnodeMsg *pFree;
pFree = (SFreeVnodeMsg *)pMsg;
......@@ -534,16 +526,16 @@ int vnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) {
int32_t code = vnodeRemoveVnode(pFree->vnode);
assert(code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS);
taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_FREE_VNODE_RSP, code);
taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP, code);
return 0;
}
int vnodeProcessCfgDnodeRequest(char *cont, int contLen, SMgmtObj *pMgmtObj) {
int dnodeProcessDnodeCfgRequest(char *cont, int contLen, SMgmtObj *pMgmtObj) {
SCfgMsg *pCfg = (SCfgMsg *)cont;
int code = tsCfgDynamicOptions(pCfg->config);
taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_CFG_PNODE_RSP, code);
taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_DNODE_CFG_RSP, code);
return 0;
}
......@@ -554,7 +546,7 @@ void dnodeSendVpeerCfgMsg(int32_t vnode) {
SVpeerCfgMsg *pCfg;
SMgmtObj * pObj = &mgmtObj;
pStart = taosBuildReqMsgToMnode(pObj, TSDB_MSG_TYPE_VPEER_CFG);
pStart = taosBuildReqMsgToMnode(pObj, TSDB_MSG_TYPE_VNODE_CFG);
if (pStart == NULL) return;
pMsg = pStart;
......@@ -572,7 +564,7 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) {
SMeterCfgMsg *pCfg;
SMgmtObj * pObj = &mgmtObj;
pStart = taosBuildReqMsgToMnode(pObj, TSDB_MSG_TYPE_METER_CFG);
pStart = taosBuildReqMsgToMnode(pObj, TSDB_MSG_TYPE_TABLE_CFG);
if (pStart == NULL) return -1;
pMsg = pStart;
......@@ -585,3 +577,18 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) {
return taosSendMsgToMnode(pObj, pStart, msgLen);
}
void dnodeInitProcessShellMsg() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE] = dnodeProcessCreateTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_NORMAL_TABLE] = dnodeProcessCreateTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_STREAM_TABLE] = dnodeProcessCreateTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE] = dnodeProcessRemoveTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_NORMAL_TABLE] = dnodeProcessRemoveTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_STREAM_TABLE] = dnodeProcessRemoveTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_VPEERS] = dnodeProcessVPeersMsg;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_FREE_VNODE] = dnodeProcessFreeVnodeRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_TABLE_CFG_RSP] = dnodeProcessTableCfgRsp;
}
\ No newline at end of file
......@@ -17,45 +17,51 @@
#include "os.h"
#include "taoserror.h"
#include "tlog.h"
#include "dnodeWrite.h"
#include "dnode.h"
#include "dnodeRead.h"
#include "dnodeSystem.h"
void dnodeFreeQInfoInQueue(SShellObj *pShellObj) {
}
void dnodeFreeQInfo(void *pConn) {}
void dnodeFreeQInfos() {}
void dnodeQueryData(SQueryMeterMsg *pQuery, void *pConn, void (*callback)(int32_t code, void *pQInfo, void *pConn)) {
void *pQInfo = NULL;
int code = TSDB_CODE_SUCCESS;
callback(code, pConn, pQInfo);
}
void dnodeExecuteRetrieveData(SSchedMsg *pSched) {
static void dnodeExecuteRetrieveData(SSchedMsg *pSched) {
SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *)pSched->msg;
SDnodeRetrieveCallbackFp callback = (SDnodeRetrieveCallbackFp)pSched->thandle;
SShellObj *pObj = (SShellObj *)pSched->ahandle;
SRetrieveMeterRsp result = {0};
/*
* in case of server restart, apps may hold qhandle created by server before restart,
* which is actually invalid, therefore, signature check is required.
*/
if (pRetrieve->qhandle != (uint64_t)pObj->qhandle) {
// if free flag is set, client wants to clean the resources
dError("QInfo:%p, qhandle:%p is not matched with saved:%p", pObj->qhandle, pRetrieve->qhandle, pObj->qhandle);
int32_t code = TSDB_CODE_INVALID_QHANDLE;
(*callback)(code, &result, pObj);
}
void *pConn = pSched->ahandle;
//examples
int32_t code = TSDB_CODE_INVALID_QHANDLE;
void *pQInfo = NULL; //get from pConn
(*callback)(code, NULL, pConn);
//TODO build response here
free(pSched->msg);
}
void dnodeRetrieveData(SRetrieveMeterMsg *pMsg, int32_t msgLen, void *pShellObj, SDnodeRetrieveCallbackFp callback) {
int8_t *msg = malloc(msgLen);
memcpy(msg, pMsg, msgLen);
void dnodeRetrieveData(SRetrieveMeterMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp) {
int8_t *msg = malloc(sizeof(pRetrieve));
memcpy(msg, pRetrieve, sizeof(pRetrieve));
SSchedMsg schedMsg;
schedMsg.msg = msg;
schedMsg.ahandle = pShellObj;
schedMsg.thandle = callback;
schedMsg.ahandle = pConn;
schedMsg.thandle = callbackFp;
schedMsg.fp = dnodeExecuteRetrieveData;
taosScheduleTask(tsQueryQhandle, &schedMsg);
}
int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveMeterRsp *retrievalRsp) {
return 0;
}
int32_t dnodeGetRetrieveDataSize(void *pQInfo) {}
......@@ -32,334 +32,155 @@
#include "dnodeUtil.h"
#include "dnodeWrite.h"
static void dnodeProcessRetrieveRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj);
static void dnodeProcessQueryRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj);
static void dnodeProcessShellSubmitRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj);
static void *tsDnodeShellServer = NULL;
static SShellObj *tsDnodeShellList = NULL;
static int32_t tsDnodeSelectReqNum = 0;
static int32_t tsDnodeInsertReqNum = 0;
static int32_t tsDnodeShellConns = 0;
#define NUM_OF_SESSIONS_PER_VNODE (300)
#define NUM_OF_SESSIONS_PER_DNODE (NUM_OF_SESSIONS_PER_VNODE * TSDB_MAX_VNODES)
void *dnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
int sid, vnode;
SShellObj *pObj = (SShellObj *)ahandle;
SIntMsg * pMsg = (SIntMsg *)msg;
uint32_t peerId, peerIp;
uint16_t peerPort;
char ipstr[20];
if (msg == NULL) {
if (pObj) {
pObj->thandle = NULL;
dTrace("QInfo:%p %s free qhandle", pObj->qhandle, __FUNCTION__);
dnodeFreeQInfoInQueue(pObj);
pObj->qhandle = NULL;
tsDnodeShellConns--;
dTrace("shell connection:%d is gone, shellConns:%d", pObj->sid, tsDnodeShellConns);
}
return NULL;
}
static void dnodeProcessRetrieveRequest(int8_t *pCont, int32_t contLen, void *pConn);
static void dnodeProcessQueryRequest(int8_t *pCont, int32_t contLen, void *pConn);
static void dnodeProcessShellSubmitRequest(int8_t *pCont, int32_t contLen, void *pConn);
taosGetRpcConnInfo(thandle, &peerId, &peerIp, &peerPort, &vnode, &sid);
static void *tsDnodeShellServer = NULL;
static int32_t tsDnodeQueryReqNum = 0;
static int32_t tsDnodeSubmitReqNum = 0;
if (pObj == NULL) {
pObj = tsDnodeShellList + sid;
pObj->thandle = thandle;
pObj->sid = sid;
pObj->ip = peerIp;
tinet_ntoa(ipstr, peerIp);
tsDnodeShellConns--;
dTrace("shell connection:%d from ip:%s is created, shellConns:%d", sid, ipstr, tsDnodeShellConns);
} else {
if (pObj != tsDnodeShellList + sid) {
dError("shell connection:%d, pObj:%p is not matched with:%p", sid, pObj, tsDnodeShellList + sid);
return NULL;
}
}
void dnodeProcessMsgFromShell(int32_t msgType, void *pCont, int32_t contLen, void *handle, int32_t index) {
assert(handle != NULL);
dTrace("vid:%d sid:%d, msg:%s is received pConn:%p", vnode, sid, taosMsg[pMsg->msgType], thandle);
if (pCont == NULL || contLen == 0) {
dnodeFreeQInfo(handle);
dTrace("conn:%p, free query info", handle);
return;
}
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY);
dTrace("sid:%d, shell query msg is ignored since dnode not running", sid);
return pObj;
rpcSendSimpleRsp(handle, TSDB_CODE_NOT_READY);
dTrace("conn:%p, query msg is ignored since dnode not running", handle);
return;
}
if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) {
dnodeProcessQueryRequest(pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
} else if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) {
dnodeProcessRetrieveRequest(pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
} else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
dnodeProcessShellSubmitRequest(pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
dTrace("conn:%p, msg:%s is received", handle, taosMsg[msgType]);
if (msgType == TSDB_MSG_TYPE_DNODE_QUERY) {
dnodeProcessQueryRequest(pCont, contLen, handle);
} else if (msgType == TSDB_MSG_TYPE_DNODE_RETRIEVE) {
dnodeProcessRetrieveRequest(pCont, contLen, handle);
} else if (msgType == TSDB_MSG_TYPE_DNODE_SUBMIT) {
dnodeProcessShellSubmitRequest(pCont, contLen, handle);
} else {
dError("%s is not processed", taosMsg[pMsg->msgType]);
dError("conn:%p, msg:%s is not processed", handle, taosMsg[msgType]);
}
return pObj;
}
int32_t dnodeInitShell() {
SRpcInit rpcInit;
int numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
numOfThreads = (1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0;
int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0);
if (numOfThreads < 1) {
numOfThreads = 1;
}
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;
rpcInit.localPort = tsVnodeShellPort;
rpcInit.label = "DND-shell";
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;
rpcInit.localPort = tsVnodeShellPort;
rpcInit.label = "DND-shell";
rpcInit.numOfThreads = numOfThreads;
rpcInit.fp = dnodeProcessMsgFromShell;
rpcInit.bits = TSDB_SHELL_VNODE_BITS;
rpcInit.numOfChanns = TSDB_MAX_VNODES;
rpcInit.sessionsPerChann = 16;
rpcInit.idMgmt = TAOS_ID_FREE;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S();
rpcInit.idleTime = tsShellActivityTimer * 2000;
rpcInit.qhandle = tsRpcQhandle[0];
//rpcInit.efp = vnodeSendVpeerCfgMsg;
tsDnodeShellServer = taosOpenRpc(&rpcInit);
if (tsDnodeShellServer == NULL) {
dError("failed to init connection to shell");
return -1;
}
rpcInit.fp = dnodeProcessMsgFromShell;
rpcInit.sessions = TSDB_SESSIONS_PER_DNODE;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S();
rpcInit.idleTime = tsShellActivityTimer * 2000;
const int32_t size = NUM_OF_SESSIONS_PER_DNODE * sizeof(SShellObj);
tsDnodeShellList = (SShellObj *)malloc(size);
if (tsDnodeShellList == NULL) {
dError("failed to allocate shellObj, sessions:%d", NUM_OF_SESSIONS_PER_DNODE);
return -1;
}
memset(tsDnodeShellList, 0, size);
// TODO re initialize tsRpcQhandle
if(taosOpenRpcChannWithQ(tsDnodeShellServer, 0, NUM_OF_SESSIONS_PER_DNODE, tsRpcQhandle) != TSDB_CODE_SUCCESS) {
dError("sessions:%d, failed to open shell", NUM_OF_SESSIONS_PER_DNODE);
tsDnodeShellServer = rpcOpen(&rpcInit);
if (tsDnodeShellServer == NULL) {
dError("failed to init connection from shell");
return -1;
}
dError("sessions:%d, shell is opened", NUM_OF_SESSIONS_PER_DNODE);
dPrint("shell is opened");
return TSDB_CODE_SUCCESS;
}
void dnodeCleanupShell() {
if (tsDnodeShellServer) {
taosCloseRpc(tsDnodeShellServer);
rpcClose(tsDnodeShellServer);
}
for (int i = 0; i < NUM_OF_SESSIONS_PER_DNODE; ++i) {
dnodeFreeQInfoInQueue(tsDnodeShellList+i);
}
//tfree(tsDnodeShellList);
dnodeFreeQInfos();
}
int vnodeSendQueryRspMsg(SShellObj *pObj, int code, void *qhandle) {
char *pMsg, *pStart;
int msgLen;
pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_QUERY_RSP, 128);
if (pStart == NULL) return -1;
pMsg = pStart;
*pMsg = code;
pMsg++;
void dnodeProcessQueryRequestCb(int code, void *pQInfo, void *pConn) {
int32_t contLen = sizeof(SQueryMeterRsp);
SQueryMeterRsp *queryRsp = (SQueryMeterRsp *) rpcMallocCont(contLen);
if (queryRsp == NULL) {
return;
}
*((uint64_t *)pMsg) = (uint64_t)qhandle;
pMsg += 8;
dTrace("conn:%p, query data, code:%d pQInfo:%p", pConn, code, pQInfo);
msgLen = pMsg - pStart;
taosSendMsgToPeer(pObj->thandle, pStart, msgLen);
queryRsp->code = htonl(code);
queryRsp->qhandle = (uint64_t) (pQInfo);
return msgLen;
rpcSendResponse(pConn, queryRsp, contLen);
}
int32_t dnodeSendShellSubmitRspMsg(SShellObj *pObj, int32_t code, int32_t numOfPoints) {
char *pMsg, *pStart;
int msgLen;
dTrace("code:%d numOfTotalPoints:%d", code, numOfPoints);
pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_SUBMIT_RSP, 128);
if (pStart == NULL) return -1;
pMsg = pStart;
*pMsg = code;
pMsg++;
static void dnodeProcessQueryRequest(int8_t *pCont, int32_t contLen, void *pConn) {
atomic_fetch_add_32(&tsDnodeQueryReqNum, 1);
dTrace("conn:%p, start to query data", pConn);
*(int32_t *)pMsg = numOfPoints;
pMsg += sizeof(numOfPoints);
msgLen = pMsg - pStart;
taosSendMsgToPeer(pObj->thandle, pStart, msgLen);
return msgLen;
SQueryMeterMsg *pQuery = (SQueryMeterMsg *) pCont;
dnodeQueryData(pQuery, pConn, dnodeProcessQueryRequestCb);
}
int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
int ret, code = 0;
SQueryMeterMsg * pQueryMsg;
SMeterSidExtInfo **pSids = NULL;
int32_t incNumber = 0;
SSqlFunctionExpr * pExprs = NULL;
SSqlGroupbyExpr * pGroupbyExpr = NULL;
SMeterObj ** pMeterObjList = NULL;
pQueryMsg = (SQueryMeterMsg *)pMsg;
if ((code = vnodeConvertQueryMeterMsg(pQueryMsg)) != TSDB_CODE_SUCCESS) {
goto _query_over;
}
void dnodeProcessRetrieveRequestCb(int32_t code, void *pQInfo, void *pConn) {
dTrace("conn:%p, retrieve data, code:%d", pConn, code);
if (pQueryMsg->numOfSids <= 0) {
dError("Invalid number of meters to query, numOfSids:%d", pQueryMsg->numOfSids);
code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
}
if (pQueryMsg->vnode >= TSDB_MAX_VNODES || pQueryMsg->vnode < 0) {
dError("qmsg:%p,vid:%d is out of range", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_INVALID_TABLE_ID;
goto _query_over;
}
SVnodeObj *pVnode = &vnodeList[pQueryMsg->vnode];
if (pVnode->cfg.maxSessions == 0) {
dError("qmsg:%p,vid:%d is not activated yet", pQueryMsg, pQueryMsg->vnode);
vnodeSendVpeerCfgMsg(pQueryMsg->vnode);
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _query_over;
}
if (!(pVnode->accessState & TSDB_VN_READ_ACCCESS)) {
dError("qmsg:%p,vid:%d access not allowed", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_NO_READ_ACCESS;
goto _query_over;
}
if (pVnode->meterList == NULL) {
dError("qmsg:%p,vid:%d has been closed", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _query_over;
}
if (pQueryMsg->pSidExtInfo == 0) {
dError("qmsg:%p,SQueryMeterMsg wrong format", pQueryMsg);
code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
}
pSids = (SMeterSidExtInfo **)pQueryMsg->pSidExtInfo;
for (int32_t i = 0; i < pQueryMsg->numOfSids; ++i) {
if (pSids[i]->sid >= pVnode->cfg.maxSessions || pSids[i]->sid < 0) {
dError("qmsg:%p sid:%d out of range, valid range:[%d,%d]", pQueryMsg, pSids[i]->sid, 0, pVnode->cfg.maxSessions);
code = TSDB_CODE_INVALID_TABLE_ID;
goto _query_over;
}
}
// todo optimize for single table query process
pMeterObjList = (SMeterObj **)calloc(pQueryMsg->numOfSids, sizeof(SMeterObj *));
if (pMeterObjList == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _query_over;
}
//add query ref for all meters. if any meter failed to add ref, rollback whole operation and go to error
pthread_mutex_lock(&pVnode->vmutex);
code = vnodeIncQueryRefCount(pQueryMsg, pSids, pMeterObjList, &incNumber);
assert(incNumber <= pQueryMsg->numOfSids);
pthread_mutex_unlock(&pVnode->vmutex);
if (code != TSDB_CODE_SUCCESS || pQueryMsg->numOfSids == 0) { // all the meters may have been dropped.
goto _query_over;
}
pExprs = vnodeCreateSqlFunctionExpr(pQueryMsg, &code);
if (pExprs == NULL) {
assert(code != TSDB_CODE_SUCCESS);
goto _query_over;
}
pGroupbyExpr = vnodeCreateGroupbyExpr(pQueryMsg, &code);
if ((pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) {
goto _query_over;
}
if (pObj->qhandle) {
dTrace("QInfo:%p %s free qhandle", pObj->qhandle, __FUNCTION__);
void* qHandle = pObj->qhandle;
pObj->qhandle = NULL;
vnodeDecRefCount(qHandle);
assert(pConn != NULL);
if (code != TSDB_CODE_SUCCESS) {
rpcSendSimpleRsp(pConn, code);
return;
}
if (QUERY_IS_STABLE_QUERY(pQueryMsg->queryType)) {
pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code);
} else {
pObj->qhandle = vnodeQueryOnSingleTable(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code);
assert(pQInfo != NULL);
int32_t contLen = dnodeGetRetrieveDataSize(pQInfo);
SRetrieveMeterRsp *retrieveRsp = (SRetrieveMeterRsp *) rpcMallocCont(contLen);
if (retrieveRsp == NULL) {
rpcSendSimpleRsp(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY);
return;
}
_query_over:
// if failed to add ref for all meters in this query, abort current query
code = dnodeGetRetrieveData(pQInfo, retrieveRsp);
if (code != TSDB_CODE_SUCCESS) {
vnodeDecQueryRefCount(pQueryMsg, pMeterObjList, incNumber);
rpcSendSimpleRsp(pConn, TSDB_CODE_INVALID_QHANDLE);
}
tfree(pQueryMsg->pSqlFuncExprs);
tfree(pMeterObjList);
ret = vnodeSendQueryRspMsg(pObj, code, pObj->qhandle);
tfree(pQueryMsg->pSidExtInfo);
for(int32_t i = 0; i < pQueryMsg->numOfCols; ++i) {
vnodeFreeColumnInfo(&pQueryMsg->colList[i]);
}
retrieveRsp->numOfRows = htonl(retrieveRsp->numOfRows);
retrieveRsp->precision = htons(retrieveRsp->precision);
retrieveRsp->offset = htobe64(retrieveRsp->offset);
retrieveRsp->useconds = htobe64(retrieveRsp->useconds);
atomic_fetch_add_32(&tsDnodeSelectReqNum, 1);
return ret;
rpcSendResponse(pConn, retrieveRsp, contLen);
}
void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
}
static void dnodeProcessRetrieveRequest(int8_t *pCont, int32_t contLen, void *pConn) {
dTrace("conn:%p, start to retrieve data", pConn);
void dnodeProcessRetrieveRequestCb(int code, SRetrieveMeterRsp *result, SShellObj *pObj) {
if (pObj == NULL || result == NULL || code == TSDB_CODE_ACTION_IN_PROGRESS) {
return;
}
SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *) pCont;
dnodeRetrieveData(pRetrieve, pConn, dnodeProcessRetrieveRequestCb);
}
static void dnodeProcessRetrieveRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj) {
SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *) pMsg;
dnodeRetrieveData(pRetrieve, msgLen, pObj, dnodeProcessRetrieveRequestCb);
}
void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pConn) {
assert(result != NULL);
void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pObj) {
if (pObj == NULL || result == NULL || result->code == TSDB_CODE_ACTION_IN_PROGRESS) {
if (result->code != 0) {
rpcSendSimpleRsp(pConn, result->code);
return;
}
SShellObj *pShellObj = (SShellObj *) pObj;
int32_t msgLen = sizeof(SShellSubmitRspMsg) + result->numOfFailedBlocks * sizeof(SShellSubmitRspBlock);
SShellSubmitRspMsg *submitRsp = (SShellSubmitRspMsg *) taosBuildRspMsgWithSize(pShellObj->thandle,
TSDB_MSG_TYPE_SUBMIT_RSP, msgLen);
int32_t contLen = sizeof(SShellSubmitRspMsg) + result->numOfFailedBlocks * sizeof(SShellSubmitRspBlock);
SShellSubmitRspMsg *submitRsp = (SShellSubmitRspMsg *) rpcMallocCont(contLen);
if (submitRsp == NULL) {
rpcSendSimpleRsp(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY);
return;
}
dTrace("code:%d, numOfRows:%d affectedRows:%d", result->code, result->numOfRows, result->affectedRows);
memcpy(submitRsp, result, msgLen);
memcpy(submitRsp, result, contLen);
for (int i = 0; i < submitRsp->numOfFailedBlocks; ++i) {
SShellSubmitRspBlock *block = &submitRsp->failedBlocks[i];
......@@ -368,6 +189,7 @@ void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pObj) {
} else if (block->code == TSDB_CODE_INVALID_TABLE_ID || block->code == TSDB_CODE_NOT_ACTIVE_TABLE) {
dnodeSendMeterCfgMsg(block->vnode, block->sid);
}
block->index = htonl(block->index);
block->vnode = htonl(block->vnode);
block->sid = htonl(block->sid);
block->code = htonl(block->code);
......@@ -378,21 +200,21 @@ void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pObj) {
submitRsp->failedRows = htonl(submitRsp->failedRows);
submitRsp->numOfFailedBlocks = htonl(submitRsp->numOfFailedBlocks);
taosSendMsgToPeer(pShellObj->thandle, (int8_t*)submitRsp, msgLen);
rpcSendResponse(pConn, submitRsp, contLen);
}
static void dnodeProcessShellSubmitRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj) {
SShellSubmitMsg *pSubmit = (SShellSubmitMsg *) pMsg;
dnodeWriteData(pSubmit, pObj, dnodeProcessShellSubmitRequestCb);
atomic_fetch_add_32(&tsDnodeInsertReqNum, 1);
static void dnodeProcessShellSubmitRequest(int8_t *pCont, int32_t contLen, void *pConn) {
SShellSubmitMsg *pSubmit = (SShellSubmitMsg *) pCont;
dnodeWriteData(pSubmit, pConn, dnodeProcessShellSubmitRequestCb);
atomic_fetch_add_32(&tsDnodeSubmitReqNum, 1);
}
SDnodeStatisInfo dnodeGetStatisInfo() {
SDnodeStatisInfo info = {0};
if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) {
info.httpReqNum = httpGetReqCount();
info.selectReqNum = atomic_exchange_32(&tsDnodeSelectReqNum, 0);
info.insertReqNum = atomic_exchange_32(&tsDnodeInsertReqNum, 0);
info.queryReqNum = atomic_exchange_32(&tsDnodeQueryReqNum, 0);
info.submitReqNum = atomic_exchange_32(&tsDnodeSubmitReqNum, 0);
}
return info;
......
......@@ -19,18 +19,14 @@
#include "tlog.h"
#include "dnodeWrite.h"
int32_t dnodeCheckTableExist(char *tableId) {
return 0;
}
void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pShellObj, void (*callback)(SShellSubmitRspMsg *, void *)) {
void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)) {
SShellSubmitRspMsg result = {0};
int32_t numOfSid = htonl(pSubmit->numOfSid);
if (numOfSid <= 0) {
dError("invalid num of tables:%d", numOfSid);
result.code = TSDB_CODE_INVALID_QUERY_MSG;
callback(&result, pShellObj);
callback(&result, pConn);
}
//TODO: submit implementation
......
......@@ -25,8 +25,8 @@ extern "C" {
#include "tsched.h"
typedef struct {
int32_t selectReqNum;
int32_t insertReqNum;
int32_t queryReqNum;
int32_t submitReqNum;
int32_t httpReqNum;
} SDnodeStatisInfo;
......
......@@ -238,6 +238,9 @@ extern "C" {
#define TSQL_SO_ASC 1
#define TSQL_SO_DESC 0
#define TSDB_SESSIONS_PER_VNODE (300)
#define TSDB_SESSIONS_PER_DNODE (TSDB_SESSIONS_PER_VNODE * TSDB_MAX_VNODES)
#ifdef __cplusplus
}
#endif
......
......@@ -28,114 +28,118 @@ extern "C" {
#include "taosdef.h"
// message type
#define TSDB_MSG_TYPE_REG 1
#define TSDB_MSG_TYPE_REG_RSP 2
#define TSDB_MSG_TYPE_SUBMIT 3
#define TSDB_MSG_TYPE_SUBMIT_RSP 4
#define TSDB_MSG_TYPE_NWCHANGE 5
#define TSDB_MSG_TYPE_NWCHANGE_RSP 6
#define TSDB_MSG_TYPE_DELIVER 7
#define TSDB_MSG_TYPE_DELIVER_RSP 8
#define TSDB_MSG_TYPE_CREATE 9
#define TSDB_MSG_TYPE_CREATE_RSP 10
#define TSDB_MSG_TYPE_REMOVE 11
#define TSDB_MSG_TYPE_REMOVE_RSP 12
#define TSDB_MSG_TYPE_VPEERS 13
#define TSDB_MSG_TYPE_VPEERS_RSP 14
#define TSDB_MSG_TYPE_FREE_VNODE 15
#define TSDB_MSG_TYPE_FREE_VNODE_RSP 16
#define TSDB_MSG_TYPE_VPEER_CFG 17
#define TSDB_MSG_TYPE_VPEER_CFG_RSP 18
#define TSDB_MSG_TYPE_METER_CFG 19
#define TSDB_MSG_TYPE_METER_CFG_RSP 20
#define TSDB_MSG_TYPE_VPEER_FWD 21
#define TSDB_MSG_TYPE_VPEER_FWD_RSP 22
#define TSDB_MSG_TYPE_SYNC 23
#define TSDB_MSG_TYPE_SYNC_RSP 24
#define TSDB_MSG_TYPE_INSERT 25
#define TSDB_MSG_TYPE_INSERT_RSP 26
#define TSDB_MSG_TYPE_QUERY 27
#define TSDB_MSG_TYPE_QUERY_RSP 28
#define TSDB_MSG_TYPE_RETRIEVE 29
#define TSDB_MSG_TYPE_RETRIEVE_RSP 30
#define TSDB_MSG_TYPE_CONNECT 31
#define TSDB_MSG_TYPE_CONNECT_RSP 32
#define TSDB_MSG_TYPE_CREATE_ACCT 33
#define TSDB_MSG_TYPE_CREATE_ACCT_RSP 34
#define TSDB_MSG_TYPE_CREATE_USER 35
#define TSDB_MSG_TYPE_CREATE_USER_RSP 36
#define TSDB_MSG_TYPE_DROP_ACCT 37
#define TSDB_MSG_TYPE_DROP_ACCT_RSP 38
#define TSDB_MSG_TYPE_DROP_USER 39
#define TSDB_MSG_TYPE_DROP_USER_RSP 40
#define TSDB_MSG_TYPE_ALTER_USER 41
#define TSDB_MSG_TYPE_ALTER_USER_RSP 42
#define TSDB_MSG_TYPE_CREATE_MNODE 43
#define TSDB_MSG_TYPE_CREATE_MNODE_RSP 44
#define TSDB_MSG_TYPE_DROP_MNODE 45
#define TSDB_MSG_TYPE_DROP_MNODE_RSP 46
#define TSDB_MSG_TYPE_CREATE_DNODE 47
#define TSDB_MSG_TYPE_CREATE_DNODE_RSP 48
#define TSDB_MSG_TYPE_DROP_DNODE 49
#define TSDB_MSG_TYPE_DROP_DNODE_RSP 50
#define TSDB_MSG_TYPE_CREATE_DB 51
#define TSDB_MSG_TYPE_CREATE_DB_RSP 52
#define TSDB_MSG_TYPE_DROP_DB 53
#define TSDB_MSG_TYPE_DROP_DB_RSP 54
#define TSDB_MSG_TYPE_USE_DB 55
#define TSDB_MSG_TYPE_USE_DB_RSP 56
#define TSDB_MSG_TYPE_CREATE_TABLE 57
#define TSDB_MSG_TYPE_CREATE_TABLE_RSP 58
#define TSDB_MSG_TYPE_DROP_TABLE 59
#define TSDB_MSG_TYPE_DROP_TABLE_RSP 60
#define TSDB_MSG_TYPE_METERINFO 61
#define TSDB_MSG_TYPE_METERINFO_RSP 62
#define TSDB_MSG_TYPE_METRIC_META 63
#define TSDB_MSG_TYPE_METRIC_META_RSP 64
#define TSDB_MSG_TYPE_SHOW 65
#define TSDB_MSG_TYPE_SHOW_RSP 66
#define TSDB_MSG_TYPE_FORWARD 67
#define TSDB_MSG_TYPE_FORWARD_RSP 68
#define TSDB_MSG_TYPE_CFG_PNODE 69
#define TSDB_MSG_TYPE_CFG_PNODE_RSP 70
#define TSDB_MSG_TYPE_CFG_MNODE 71
#define TSDB_MSG_TYPE_CFG_MNODE_RSP 72
#define TSDB_MSG_TYPE_KILL_QUERY 73
#define TSDB_MSG_TYPE_KILL_QUERY_RSP 74
#define TSDB_MSG_TYPE_KILL_STREAM 75
#define TSDB_MSG_TYPE_KILL_STREAM_RSP 76
#define TSDB_MSG_TYPE_KILL_CONNECTION 77
#define TSDB_MSG_TYPE_KILL_CONNECTION_RSP 78
#define TSDB_MSG_TYPE_ALTER_STREAM 79
#define TSDB_MSG_TYPE_ALTER_STREAM_RSP 80
#define TSDB_MSG_TYPE_ALTER_TABLE 81
#define TSDB_MSG_TYPE_ALTER_TABLE_RSP 82
#define TSDB_MSG_TYPE_ALTER_DB 83
#define TSDB_MSG_TYPE_ALTER_DB_RSP 84
#define TSDB_MSG_TYPE_MULTI_METERINFO 85
#define TSDB_MSG_TYPE_MULTI_METERINFO_RSP 86
#define TSDB_MSG_TYPE_HEARTBEAT 91
#define TSDB_MSG_TYPE_HEARTBEAT_RSP 92
#define TSDB_MSG_TYPE_STATUS 93
#define TSDB_MSG_TYPE_STATUS_RSP 94
#define TSDB_MSG_TYPE_GRANT 95
#define TSDB_MSG_TYPE_GRANT_RSP 96
#define TSDB_MSG_TYPE_ALTER_ACCT 97
#define TSDB_MSG_TYPE_ALTER_ACCT_RSP 98
#define TSDB_MSG_TYPE_MAX 101
#define TSDB_MSG_TYPE_REG 1
#define TSDB_MSG_TYPE_REG_RSP 2
#define TSDB_MSG_TYPE_DNODE_SUBMIT 3
#define TSDB_MSG_TYPE_DNODE_SUBMIT_RSP 4
#define TSDB_MSG_TYPE_DNODE_QUERY 5
#define TSDB_MSG_TYPE_DNODE_QUERY_RSP 6
#define TSDB_MSG_TYPE_DNODE_RETRIEVE 7
#define TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP 8
#define TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE 9
#define TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE_RSP 10
#define TSDB_MSG_TYPE_DNODE_CREATE_NORMAL_TABLE 11
#define TSDB_MSG_TYPE_DNODE_CREATE_NORMAL_TABLE_RSP 12
#define TSDB_MSG_TYPE_DNODE_CREATE_STREAM_TABLE 13
#define TSDB_MSG_TYPE_DNODE_CREATE_STREAM_TABLE_RSP 14
#define TSDB_MSG_TYPE_DNODE_CREATE_SUPER_TABLE 15
#define TSDB_MSG_TYPE_DNODE_CREATE_SUPER_TABLE_RSP 16
#define TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE 17
#define TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE_RSP 18
#define TSDB_MSG_TYPE_DNODE_REMOVE_NORMAL_TABLE 19
#define TSDB_MSG_TYPE_DNODE_REMOVE_NORMAL_TABLE_RSP 20
#define TSDB_MSG_TYPE_DNODE_REMOVE_STREAM_TABLE 21
#define TSDB_MSG_TYPE_DNODE_REMOVE_STREAM_TABLE_RSP 22
#define TSDB_MSG_TYPE_DNODE_REMOVE_SUPER_TABLE 23
#define TSDB_MSG_TYPE_DNODE_REMOVE_SUPER_TABLE_RSP 24
#define TSDB_MSG_TYPE_DNODE_ALTER_CHILD_TABLE 25
#define TSDB_MSG_TYPE_DNODE_ALTER_CHILD_TABLE_RSP 26
#define TSDB_MSG_TYPE_DNODE_ALTER_NORMAL_TABLE 27
#define TSDB_MSG_TYPE_DNODE_ALTER_NORMAL_TABLE_RSP 28
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_TABLE 29
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_TABLE_RSP 30
#define TSDB_MSG_TYPE_DNODE_ALTER_SUPER_TABLE 31
#define TSDB_MSG_TYPE_DNODE_ALTER_SUPER_TABLE_RSP 32
#define TSDB_MSG_TYPE_DNODE_VPEERS 33
#define TSDB_MSG_TYPE_DNODE_VPEERS_RSP 34
#define TSDB_MSG_TYPE_DNODE_FREE_VNODE 35
#define TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP 36
#define TSDB_MSG_TYPE_DNODE_CFG 37
#define TSDB_MSG_TYPE_DNODE_CFG_RSP 38
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM 39
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_RSP 40
#define TSDB_MSG_TYPE_SDB_SYNC 41
#define TSDB_MSG_TYPE_SDB_SYNC_RSP 42
#define TSDB_MSG_TYPE_SDB_FORWARD 43
#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 44
#define TSDB_MSG_TYPE_CONNECT 51
#define TSDB_MSG_TYPE_CONNECT_RSP 52
#define TSDB_MSG_TYPE_CREATE_ACCT 53
#define TSDB_MSG_TYPE_CREATE_ACCT_RSP 54
#define TSDB_MSG_TYPE_ALTER_ACCT 55
#define TSDB_MSG_TYPE_ALTER_ACCT_RSP 56
#define TSDB_MSG_TYPE_DROP_ACCT 57
#define TSDB_MSG_TYPE_DROP_ACCT_RSP 58
#define TSDB_MSG_TYPE_CREATE_USER 59
#define TSDB_MSG_TYPE_CREATE_USER_RSP 60
#define TSDB_MSG_TYPE_ALTER_USER 61
#define TSDB_MSG_TYPE_ALTER_USER_RSP 62
#define TSDB_MSG_TYPE_DROP_USER 63
#define TSDB_MSG_TYPE_DROP_USER_RSP 64
#define TSDB_MSG_TYPE_CREATE_MNODE 65
#define TSDB_MSG_TYPE_CREATE_MNODE_RSP 66
#define TSDB_MSG_TYPE_DROP_MNODE 67
#define TSDB_MSG_TYPE_DROP_MNODE_RSP 68
#define TSDB_MSG_TYPE_CREATE_DNODE 69
#define TSDB_MSG_TYPE_CREATE_DNODE_RSP 70
#define TSDB_MSG_TYPE_DROP_DNODE 71
#define TSDB_MSG_TYPE_DROP_DNODE_RSP 72
#define TSDB_MSG_TYPE_ALTER_DNODE 73
#define TSDB_MSG_TYPE_ALTER_DNODE_RSP 74
#define TSDB_MSG_TYPE_CREATE_DB 75
#define TSDB_MSG_TYPE_CREATE_DB_RSP 76
#define TSDB_MSG_TYPE_DROP_DB 77
#define TSDB_MSG_TYPE_DROP_DB_RSP 78
#define TSDB_MSG_TYPE_USE_DB 79
#define TSDB_MSG_TYPE_USE_DB_RSP 80
#define TSDB_MSG_TYPE_ALTER_DB 81
#define TSDB_MSG_TYPE_ALTER_DB_RSP 82
#define TSDB_MSG_TYPE_CREATE_TABLE 83
#define TSDB_MSG_TYPE_CREATE_TABLE_RSP 84
#define TSDB_MSG_TYPE_DROP_TABLE 85
#define TSDB_MSG_TYPE_DROP_TABLE_RSP 86
#define TSDB_MSG_TYPE_ALTER_TABLE 87
#define TSDB_MSG_TYPE_ALTER_TABLE_RSP 88
#define TSDB_MSG_TYPE_VNODE_CFG 89
#define TSDB_MSG_TYPE_VNODE_CFG_RSP 90
#define TSDB_MSG_TYPE_TABLE_CFG 91
#define TSDB_MSG_TYPE_TABLE_CFG_RSP 92
#define TSDB_MSG_TYPE_TABLE_META 93
#define TSDB_MSG_TYPE_TABLE_META_RSP 94
#define TSDB_MSG_TYPE_STABLE_META 95
#define TSDB_MSG_TYPE_STABLE_META_RSP 96
#define TSDB_MSG_TYPE_MULTI_TABLE_META 97
#define TSDB_MSG_TYPE_MULTI_TABLE_META_RSP 98
#define TSDB_MSG_TYPE_ALTER_STREAM 99
#define TSDB_MSG_TYPE_ALTER_STREAM_RSP 100
#define TSDB_MSG_TYPE_SHOW 101
#define TSDB_MSG_TYPE_SHOW_RSP 102
#define TSDB_MSG_TYPE_CFG_MNODE 103
#define TSDB_MSG_TYPE_CFG_MNODE_RSP 104
#define TSDB_MSG_TYPE_KILL_QUERY 105
#define TSDB_MSG_TYPE_KILL_QUERY_RSP 106
#define TSDB_MSG_TYPE_KILL_STREAM 107
#define TSDB_MSG_TYPE_KILL_STREAM_RSP 108
#define TSDB_MSG_TYPE_KILL_CONNECTION 109
#define TSDB_MSG_TYPE_KILL_CONNECTION_RSP 110
#define TSDB_MSG_TYPE_HEARTBEAT 111
#define TSDB_MSG_TYPE_HEARTBEAT_RSP 112
#define TSDB_MSG_TYPE_STATUS 113
#define TSDB_MSG_TYPE_STATUS_RSP 114
#define TSDB_MSG_TYPE_GRANT 115
#define TSDB_MSG_TYPE_GRANT_RSP 116
#define TSDB_MSG_TYPE_MAX 117
// IE type
#define TSDB_IE_TYPE_SEC 1
......@@ -287,13 +291,14 @@ typedef struct {
} SShellSubmitMsg;
typedef struct {
int32_t index; // index of failed block in submit blocks
int32_t vnode; // vnode index of failed block
int32_t sid; // table index of failed block
int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table
} SShellSubmitRspBlock;
typedef struct {
int32_t code; // 0-success, 1-inprogress, > 1 error code
int32_t code; // 0-success, > 0 error code
int32_t numOfRows; // number of records the client is trying to write
int32_t affectedRows; // number of records actually written
int32_t failedRows; // number of failed records (exclude duplicate records)
......@@ -301,7 +306,6 @@ typedef struct {
SShellSubmitRspBlock *failedBlocks;
} SShellSubmitRspMsg;
typedef struct SSchema {
uint8_t type;
char name[TSDB_COL_NAME_LEN];
......
......@@ -50,16 +50,16 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
SVgObj * pVgroup;
if (!sdbMaster) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_METER_CFG_RSP, TSDB_CODE_REDIRECT);
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_REDIRECT);
return 0;
}
int vnode = htonl(pCfg->vnode);
int sid = htonl(pCfg->sid);
pStart = taosBuildRspMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_METER_CFG_RSP, 64000);
pStart = taosBuildRspMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, 64000);
if (pStart == NULL) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_METER_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0;
}
......@@ -100,15 +100,15 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
SVgObj * pVgroup = NULL;
if (!sdbMaster) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP, TSDB_CODE_REDIRECT);
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_REDIRECT);
return 0;
}
int vnode = htonl(pCfg->vnode);
pStart = taosBuildRspMsgToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP);
pStart = taosBuildRspMsgToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP);
if (pStart == NULL) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0;
}
pMsg = pStart;
......@@ -142,7 +142,7 @@ int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) {
STaosRsp *pRsp = (STaosRsp *)msg;
if (!sdbMaster) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VPEERS_RSP, TSDB_CODE_REDIRECT);
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_DNODE_VPEERS_RSP, TSDB_CODE_REDIRECT);
return 0;
}
......@@ -172,19 +172,19 @@ int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) {
}
void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *pObj) {
if (msgType == TSDB_MSG_TYPE_METER_CFG) {
if (msgType == TSDB_MSG_TYPE_TABLE_CFG) {
mgmtProcessMeterCfgMsg(content, msgLen - sizeof(SIntMsg), pObj);
} else if (msgType == TSDB_MSG_TYPE_VPEER_CFG) {
} else if (msgType == TSDB_MSG_TYPE_VNODE_CFG) {
mgmtProcessVpeerCfgMsg(content, msgLen - sizeof(SIntMsg), pObj);
} else if (msgType == TSDB_MSG_TYPE_CREATE_RSP) {
} else if (msgType == TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE_RSP) {
mgmtProcessCreateRsp(content, msgLen - sizeof(SIntMsg), pObj);
} else if (msgType == TSDB_MSG_TYPE_REMOVE_RSP) {
// do nothing
} else if (msgType == TSDB_MSG_TYPE_VPEERS_RSP) {
} else if (msgType == TSDB_MSG_TYPE_DNODE_VPEERS_RSP) {
mgmtProcessVPeersRsp(content, msgLen - sizeof(SIntMsg), pObj);
} else if (msgType == TSDB_MSG_TYPE_FREE_VNODE_RSP) {
} else if (msgType == TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP) {
mgmtProcessFreeVnodeRsp(content, msgLen - sizeof(SIntMsg), pObj);
} else if (msgType == TSDB_MSG_TYPE_CFG_PNODE_RSP) {
} else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) {
// do nothing;
} else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) {
// do nothing;
......@@ -243,7 +243,7 @@ int32_t mgmtSendCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, int
continue;
}
int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_CREATE, 64000);
int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE, 64000);
if (pStart == NULL) {
continue;
}
......@@ -267,7 +267,7 @@ int32_t mgmtSendCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup) {
continue;
}
int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_CREATE, 64000);
int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE, 64000);
if (pStart == NULL) {
continue;
}
......@@ -291,7 +291,7 @@ int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) {
continue;
}
int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_CREATE, 64000);
int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE, 64000);
if (pStart == NULL) {
continue;
}
......@@ -322,7 +322,7 @@ int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) {
pObj = mgmtGetDnode(pVgroup->vnodeGid[i].ip);
if (pObj == NULL) continue;
pStart = taosBuildReqMsgToDnode(pObj, TSDB_MSG_TYPE_REMOVE);
pStart = taosBuildReqMsgToDnode(pObj, TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE);
if (pStart == NULL) continue;
pMsg = pStart;
......@@ -428,7 +428,7 @@ int mgmtSendVPeersMsg(SVgObj *pVgroup) {
mgmtUpdateDnode(pDnode);
if (pDnode->thandle && pVgroup->numOfVnodes >= 1) {
pStart = taosBuildReqMsgToDnode(pDnode, TSDB_MSG_TYPE_VPEERS);
pStart = taosBuildReqMsgToDnode(pDnode, TSDB_MSG_TYPE_DNODE_VPEERS);
if (pStart == NULL) continue;
pMsg = mgmtBuildVpeersIe(pStart, pVgroup, pVgroup->vnodeGid[i].vnode);
msgLen = pMsg - pStart;
......@@ -457,7 +457,7 @@ int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid) {
return -1;
}
pStart = taosBuildReqMsgToDnode(pDnode, TSDB_MSG_TYPE_FREE_VNODE);
pStart = taosBuildReqMsgToDnode(pDnode, TSDB_MSG_TYPE_DNODE_FREE_VNODE);
if (pStart == NULL) return -1;
pMsg = pStart;
......@@ -539,7 +539,7 @@ int mgmtSendCfgDnodeMsg(char *cont) {
}
#ifdef CLUSTER
pStart = taosBuildReqMsg(pDnode->thandle, TSDB_MSG_TYPE_CFG_PNODE);
pStart = taosBuildReqMsg(pDnode->thandle, TSDB_MSG_TYPE_DNODE_CFG);
if (pStart == NULL) return TSDB_CODE_NODE_OFFLINE;
pMsg = pStart;
......
......@@ -138,7 +138,7 @@ static uint32_t mgmtSetMeterTagValue(char *pTags, STabObj *pMetric, STabObj *pMe
}
static char *mgmtAllocMsg(SConnObj *pConn, int32_t size, char **pMsg, STaosRsp **pRsp) {
char *pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_METERINFO_RSP, size);
char *pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, size);
if (pStart == NULL) return 0;
*pMsg = pStart;
*pRsp = (STaosRsp *)(*pMsg);
......@@ -147,7 +147,7 @@ static char *mgmtAllocMsg(SConnObj *pConn, int32_t size, char **pMsg, STaosRsp *
}
static char *mgmtForMultiAllocMsg(SConnObj *pConn, int32_t size, char **pMsg, STaosRsp **pRsp) {
char *pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_MULTI_METERINFO_RSP, size);
char *pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_MULTI_TABLE_META_RSP, size);
if (pStart == NULL) return 0;
*pMsg = pStart;
*pRsp = (STaosRsp *)(*pMsg);
......@@ -197,7 +197,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
if (pDb == NULL || (pDb != NULL && pDb->dropStatus != TSDB_DB_STATUS_READY)) {
if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0;
}
......@@ -212,13 +212,13 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
// on demand create table from super table if meter does not exists
if (pMeterObj == NULL && pInfo->createFlag == 1) {
// write operation needs to redirect to master mnode
if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_METERINFO_RSP) != 0) {
if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_TABLE_META_RSP) != 0) {
return 0;
}
SCreateTableMsg *pCreateMsg = calloc(1, sizeof(SCreateTableMsg) + sizeof(STagData));
if (pCreateMsg == NULL) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0;
}
......@@ -241,7 +241,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
if (code != TSDB_CODE_SUCCESS) {
if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0;
}
......@@ -255,7 +255,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
}
if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0;
}
......@@ -364,7 +364,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
char *pNewMsg;
if ((pStart = mgmtForMultiAllocMsg(pConn, size, &pNewMsg, &pRsp)) == NULL) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_MULTI_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_MULTI_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0;
}
......@@ -393,7 +393,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
if (NULL == pMsgHdr) {
char* pTmp = pStart - sizeof(STaosHeader);
tfree(pTmp);
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_MULTI_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_MULTI_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
break;
}
......@@ -523,9 +523,9 @@ int mgmtProcessMetricMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
if (pMetric == NULL || (pDb != NULL && pDb->dropStatus != TSDB_DB_STATUS_READY)) {
pStart = taosBuildRspMsg(pConn->thandle, TSDB_MSG_TYPE_METRIC_META_RSP);
pStart = taosBuildRspMsg(pConn->thandle, TSDB_MSG_TYPE_STABLE_META_RSP);
if (pStart == NULL) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METRIC_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_STABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0;
}
......@@ -541,7 +541,7 @@ int mgmtProcessMetricMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
} else {
msgLen = mgmtRetrieveMetricMeta(pConn, &pStart, pSuperTableMetaMsg);
if (msgLen <= 0) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METRIC_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_STABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0;
}
}
......@@ -999,14 +999,14 @@ int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) {
*/
if (pRetrieve->qhandle != pConn->qhandle) {
mError("retrieve:%p, qhandle:%p is not matched with saved:%p", pRetrieve, pRetrieve->qhandle, pConn->qhandle);
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, TSDB_CODE_MEMORY_CORRUPTED);
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, TSDB_CODE_MEMORY_CORRUPTED);
return -1;
}
pShow = (SShowObj *)pRetrieve->qhandle;
if (pShow->signature != (void *)pShow) {
mError("pShow:%p, signature:%p, query memory is corrupted", pShow, pShow->signature);
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, TSDB_CODE_MEMORY_CORRUPTED);
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, TSDB_CODE_MEMORY_CORRUPTED);
return -1;
} else {
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
......@@ -1024,9 +1024,9 @@ int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) {
size = pShow->rowSize * rowsToRead;
}
pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, size + 100);
pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, size + 100);
if (pStart == NULL) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0;
}
......@@ -1206,7 +1206,7 @@ int mgmtProcessCfgDnodeMsg(char *pMsg, int msgLen, SConnObj *pConn) {
code = mgmtSendCfgDnodeMsg(pMsg);
}
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CFG_PNODE_RSP, code);
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DNODE_CFG_RSP, code);
if (code == 0) mTrace("dnode:%s is configured by %s", pCfg->ip, pConn->pUser->user);
......@@ -1464,9 +1464,9 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
int contLen = pMsg->msgLen - sizeof(SIntMsg) - sizeof(SMgmtHead);
// read-only request can be executed concurrently
if ((pMsg->msgType == TSDB_MSG_TYPE_METERINFO && (!mgmtCheckMeterMetaMsgType(cont))) ||
pMsg->msgType == TSDB_MSG_TYPE_METRIC_META || pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE ||
pMsg->msgType == TSDB_MSG_TYPE_SHOW || pMsg->msgType == TSDB_MSG_TYPE_MULTI_METERINFO) {
if ((pMsg->msgType == TSDB_MSG_TYPE_TABLE_META && (!mgmtCheckMeterMetaMsgType(cont))) ||
pMsg->msgType == TSDB_MSG_TYPE_STABLE_META || pMsg->msgType == TSDB_MSG_TYPE_DNODE_RETRIEVE ||
pMsg->msgType == TSDB_MSG_TYPE_SHOW || pMsg->msgType == TSDB_MSG_TYPE_MULTI_TABLE_META) {
(*mgmtProcessShellMsg[pMsg->msgType])(cont, contLen, pConn);
} else {
if (mgmtProcessShellMsg[pMsg->msgType]) {
......@@ -1498,9 +1498,9 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
}
void mgmtInitProcessShellMsg() {
mgmtProcessShellMsg[TSDB_MSG_TYPE_METERINFO] = mgmtProcessMeterMetaMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_METRIC_META] = mgmtProcessMetricMetaMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_MULTI_METERINFO] = mgmtProcessMultiMeterMetaMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_TABLE_META] = mgmtProcessMeterMetaMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_STABLE_META] = mgmtProcessMetricMetaMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_MULTI_TABLE_META] = mgmtProcessMultiMeterMetaMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_DB] = mgmtProcessCreateDbMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_DB] = mgmtProcessAlterDbMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_USER] = mgmtProcessCreateUserMsg;
......@@ -1516,7 +1516,7 @@ void mgmtInitProcessShellMsg() {
mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_TABLE] = mgmtProcessAlterTableMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_USE_DB] = mgmtProcessUseDbMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_RETRIEVE] = mgmtProcessRetrieveMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DNODE_RETRIEVE] = mgmtProcessRetrieveMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_SHOW] = mgmtProcessShowMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CONNECT] = mgmtProcessConnectMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_HEARTBEAT] = mgmtProcessHeartBeatMsg;
......@@ -1525,7 +1525,7 @@ void mgmtInitProcessShellMsg() {
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_MNODE] = mgmtProcessCreateMnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_MNODE] = mgmtProcessDropMnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CFG_MNODE] = mgmtProcessCfgMnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CFG_PNODE] = mgmtProcessCfgDnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DNODE_CFG] = mgmtProcessCfgDnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_QUERY] = mgmtProcessKillQueryMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_STREAM] = mgmtProcessKillStreamMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_CONNECTION] = mgmtProcessKillConnectionMsg;
......
......@@ -334,10 +334,10 @@ int monitorBuildBandSql(char *sql) {
int monitorBuildReqSql(char *sql) {
SDnodeStatisInfo info;
info.httpReqNum = info.insertReqNum = info.selectReqNum = 0;
info.httpReqNum = info.submitReqNum = info.queryReqNum = 0;
(*mnodeCountRequestFp)(&info);
return sprintf(sql, ", %d, %d, %d)", info.httpReqNum, info.selectReqNum, info.insertReqNum);
return sprintf(sql, ", %d, %d, %d)", info.httpReqNum, info.queryReqNum, info.submitReqNum);
}
int monitorBuildIoSql(char *sql) {
......
......@@ -13,112 +13,134 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
char *taosMsg[] = {"null",
"registration",
"registration-rsp",
"submit",
"submit-rsp",
"nw-change",
"nw-change-rsp",
"deliver",
"deliver-rsp",
char *taosMsg[] = {
"null",
"registration",
"registration-rsp",
"submit",
"submit-rsp",
"query",
"query-rsp",
"retrieve",
"retrieve-rsp",
"create-table",
"create-table-rsp", //10
"create",
"create-rsp",
"remove",
"remove-rsp",
"vpeers",
"vpeers-rsp",
"free-vnode",
"free-vnode-rsp",
"vpeer-cfg",
"vpeer-cfg-rsp",
"meter-cfg",
"meter-cfg-rsp",
"create-normal-table",
"create-normal-table-rsp",
"create-stream-table",
"create-stream-table-rsp",
"create-super-table",
"create-super-table-rsp",
"remove-table",
"remove-table-rsp",
"remove-normal-table",
"remove-normal-table-rsp", //20
"vpeer-fwd",
"vpeer-fwd-rsp",
"sync",
"sync-rsp",
"remove-stream-table",
"remove-stream-table-rsp",
"remove-super-table",
"remove-super-table-rsp",
"alter-table",
"alter-table-rsp",
"alter-normal-table",
"alter-normal-table-rsp",
"alter-stream-table",
"alter-stream-table-rsp", //30
"insert",
"insert-rsp",
"query",
"query-rsp",
"retrieve",
"retrieve-rsp",
"alter-super-table",
"alter-super-table-rsp",
"vpeers",
"vpeers-rsp",
"free-vnode",
"free-vnode-rsp",
"cfg-dnode",
"cfg-dnode-rsp",
"alter-stream",
"alter-stream-rsp", //40
"connect",
"connect-rsp",
"create-acct",
"create-acct-rsp",
"create-user",
"create-user-rsp",
"drop-acct",
"drop-acct-rsp",
"drop-user",
"drop-user-rsp",
"alter-user",
"alter-user-rsp",
"create-mnode",
"create-mnode-rsp",
"drop-mnode",
"drop-mnode-rsp",
"create-dnode",
"create-dnode-rsp",
"drop-dnode",
"drop-dnode-rsp",
"create-db",
"create-db-rsp",
"drop-db",
"drop-db-rsp",
"use-db",
"use-db-rsp",
"create-table",
"create-table-rsp",
"drop-table",
"drop-table-rsp",
"meter-info",
"meter-info-rsp",
"metric-meta",
"metric-meta-rsp",
"show",
"show-rsp",
"sync",
"sync-rsp",
"forward",
"forward-rsp",
"",
"",
"",
"",
"",
"", //50
"forward",
"forward-rsp",
"connect",
"connect-rsp",
"create-acct",
"create-acct-rsp",
"alter-acct",
"alter-acct-rsp",
"drop-acct",
"drop-acct-rsp",
"create-user",
"create-user-rsp", //60
"cfg-dnode",
"cfg-dnode-rsp",
"cfg-mnode",
"cfg-mnode-rsp",
"alter-user",
"alter-user-rsp",
"drop-user",
"drop-user-rsp",
"create-mnode",
"create-mnode-rsp",
"drop-mnode",
"drop-mnode-rsp",
"create-dnode",
"create-dnode-rsp", //70
"kill-query",
"kill-query-rsp",
"kill-stream",
"kill-stream-rsp",
"kill-connection",
"kill-connectoin-rsp", // 78
"alter-stream",
"alter-stream-rsp",
"alter-table",
"alter-table-rsp",
"drop-dnode",
"drop-dnode-rsp",
"alter-dnode",
"alter-dnode-rsp",
"create-db",
"create-db-rsp",
"drop-db",
"drop-db-rsp",
"use-db",
"use-db-rsp", //80
"",
"",
"",
"",
"",
"",
"",
"",
"alter-db",
"alter-db-rsp",
"create-table",
"create-table-rsp",
"drop-table",
"drop-table-rsp",
"alter-table",
"alter-table-rsp",
"cfg-vnode",
"cfg-vnode-rsp", //90
"heart-beat", // 91
"heart-beat-rsp",
"status",
"status-rsp",
"grant",
"grant-rsp",
"alter-acct",
"alter-acct-rsp",
"invalid"};
"cfg-table",
"cfg-table-rsp",
"table-meta",
"table-meta-rsp",
"super-table-meta",
"super-stable-meta-rsp",
"multi-table-meta",
"multi-table-meta-rsp",
"alter-stream",
"alter-stream-rsp", //100
"show",
"show-rsp",
"cfg-mnode",
"cfg-mnode-rsp",
"kill-query",
"kill-query-rsp",
"kill-stream",
"kill-stream-rsp",
"kill-connection",
"kill-connectoin-rsp", //110
"heart-beat",
"heart-beat-rsp",
"status",
"status-rsp",
"grant",
"grant-rsp",
"max"
};
\ No newline at end of file
......@@ -25,7 +25,7 @@ typedef struct _sched_msg {
void (*tfp)(void *, void *);
char *msg;
int8_t *msg;
void *ahandle;
void *thandle;
} SSchedMsg;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册