提交 7e24be74 编写于 作者: W wenzhouwww@live.cn

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

...@@ -1648,6 +1648,15 @@ typedef struct { ...@@ -1648,6 +1648,15 @@ typedef struct {
int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq); int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq); int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
int8_t igNotExists;
} SMDropCgroupReq;
int32_t tSerializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq);
int32_t tDeserializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq);
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
int8_t alterType; int8_t alterType;
......
...@@ -2627,6 +2627,35 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR ...@@ -2627,6 +2627,35 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR
return 0; return 0;
} }
int32_t tSerializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->topic) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->cgroup) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->topic) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->cgroup) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTopicReq *pReq) { int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTopicReq *pReq) {
int32_t sqlLen = 0; int32_t sqlLen = 0;
int32_t astLen = 0; int32_t astLen = 0;
......
...@@ -36,9 +36,9 @@ typedef struct SBnodeMgmt { ...@@ -36,9 +36,9 @@ typedef struct SBnodeMgmt {
// bmHandle.c // bmHandle.c
SArray *bmGetMsgHandles(); SArray *bmGetMsgHandles();
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq); int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq); int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg);
// bmWorker.c // bmWorker.c
int32_t bmStartWorker(SBnodeMgmt *pMgmt); int32_t bmStartWorker(SBnodeMgmt *pMgmt);
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {} void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {}
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) { int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonBmInfo bmInfo = {0}; SMonBmInfo bmInfo = {0};
bmGetMonitorInfo(pMgmt, &bmInfo); bmGetMonitorInfo(pMgmt, &bmInfo);
dmGetMonitorSystemInfo(&bmInfo.sys); dmGetMonitorSystemInfo(&bmInfo.sys);
...@@ -37,17 +37,15 @@ int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) { ...@@ -37,17 +37,15 @@ int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) {
} }
tSerializeSMonBmInfo(pRsp, rspLen, &bmInfo); tSerializeSMonBmInfo(pRsp, rspLen, &bmInfo);
pReq->info.rsp = pRsp; pMsg->info.rsp = pRsp;
pReq->info.rspLen = rspLen; pMsg->info.rspLen = rspLen;
tFreeSMonBmInfo(&bmInfo); tFreeSMonBmInfo(&bmInfo);
return 0; return 0;
} }
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateBnodeReq createReq = {0}; SDCreateBnodeReq createReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
...@@ -68,10 +66,8 @@ int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -68,10 +66,8 @@ int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
} }
int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropBnodeReq dropReq = {0}; SDDropBnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
......
...@@ -41,18 +41,18 @@ typedef struct SMnodeMgmt { ...@@ -41,18 +41,18 @@ typedef struct SMnodeMgmt {
// mmFile.c // mmFile.c
int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed); int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed);
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed); int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed);
// mmInt.c // mmInt.c
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq); int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg);
// mmHandle.c // mmHandle.c
SArray *mmGetMsgHandles(); SArray *mmGetMsgHandles();
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
// mmWorker.c // mmWorker.c
int32_t mmStartWorker(SMnodeMgmt *pMgmt); int32_t mmStartWorker(SMnodeMgmt *pMgmt);
...@@ -62,10 +62,10 @@ int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); ...@@ -62,10 +62,10 @@ int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -104,7 +104,7 @@ _OVER: ...@@ -104,7 +104,7 @@ _OVER:
return code; return code;
} }
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) { int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) {
char file[PATH_MAX] = {0}; char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP); snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP);
...@@ -124,11 +124,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) { ...@@ -124,11 +124,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) {
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n"); len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
int8_t replica = (pReq != NULL ? pReq->replica : pMgmt->replica); int8_t replica = (pMsg != NULL ? pMsg->replica : pMgmt->replica);
for (int32_t i = 0; i < replica; ++i) { for (int32_t i = 0; i < replica; ++i) {
SReplica *pReplica = &pMgmt->replicas[i]; SReplica *pReplica = &pMgmt->replicas[i];
if (pReq != NULL) { if (pMsg != NULL) {
pReplica = &pReq->replicas[i]; pReplica = &pMsg->replicas[i];
} }
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
......
...@@ -25,7 +25,7 @@ void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) { ...@@ -25,7 +25,7 @@ void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
mndGetLoad(pMgmt->pMnode, &pInfo->load); mndGetLoad(pMgmt->pMnode, &pInfo->load);
} }
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonMmInfo mmInfo = {0}; SMonMmInfo mmInfo = {0};
mmGetMonitorInfo(pMgmt, &mmInfo); mmGetMonitorInfo(pMgmt, &mmInfo);
dmGetMonitorSystemInfo(&mmInfo.sys); dmGetMonitorSystemInfo(&mmInfo.sys);
...@@ -44,13 +44,13 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { ...@@ -44,13 +44,13 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
} }
tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo); tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo);
pReq->info.rsp = pRsp; pMsg->info.rsp = pRsp;
pReq->info.rspLen = rspLen; pMsg->info.rspLen = rspLen;
tFreeSMonMmInfo(&mmInfo); tFreeSMonMmInfo(&mmInfo);
return 0; return 0;
} }
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonMloadInfo mloads = {0}; SMonMloadInfo mloads = {0};
mmGetMnodeLoads(pMgmt, &mloads); mmGetMnodeLoads(pMgmt, &mloads);
...@@ -67,16 +67,14 @@ int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { ...@@ -67,16 +67,14 @@ int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
} }
tSerializeSMonMloadInfo(pRsp, rspLen, &mloads); tSerializeSMonMloadInfo(pRsp, rspLen, &mloads);
pReq->info.rsp = pRsp; pMsg->info.rsp = pRsp;
pReq->info.rspLen = rspLen; pMsg->info.rspLen = rspLen;
return 0; return 0;
} }
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateMnodeReq createReq = {0}; SDCreateMnodeReq createReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
...@@ -101,10 +99,8 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -101,10 +99,8 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
} }
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropMnodeReq dropReq = {0}; SDDropMnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
...@@ -129,10 +125,8 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -129,10 +125,8 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
} }
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDAlterMnodeReq alterReq = {0}; SDAlterMnodeReq alterReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
......
...@@ -87,9 +87,9 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre ...@@ -87,9 +87,9 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre
return 0; return 0;
} }
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) { int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg) {
SMnodeOpt option = {0}; SMnodeOpt option = {0};
if (mmBuildOptionFromReq(pMgmt, &option, pReq) != 0) { if (mmBuildOptionFromReq(pMgmt, &option, pMsg) != 0) {
return -1; return -1;
} }
...@@ -98,7 +98,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) { ...@@ -98,7 +98,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) {
} }
bool deployed = true; bool deployed = true;
if (mmWriteFile(pMgmt, pReq, deployed) != 0) { if (mmWriteFile(pMgmt, pMsg, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr()); dError("failed to write mnode file since %s", terrstr());
return -1; return -1;
} }
...@@ -135,7 +135,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -135,7 +135,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue; pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue;
pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue; pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue;
pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToSyncQueue;
pMgmt->msgCb.mgmt = pMgmt; pMgmt->msgCb.mgmt = pMgmt;
bool deployed = false; bool deployed = false;
......
...@@ -126,7 +126,7 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -126,7 +126,7 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg); return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg);
} }
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); } int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); }
int32_t mmStartWorker(SMnodeMgmt *pMgmt) { int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
SSingleWorkerCfg qCfg = { SSingleWorkerCfg qCfg = {
......
...@@ -39,7 +39,7 @@ typedef struct SQnodeMgmt { ...@@ -39,7 +39,7 @@ typedef struct SQnodeMgmt {
SArray *qmGetMsgHandles(); SArray *qmGetMsgHandles();
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
// qmWorker.c // qmWorker.c
int32_t qmPutRpcMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t qmPutRpcMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {} void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {}
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) { int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonQmInfo qmInfo = {0}; SMonQmInfo qmInfo = {0};
qmGetMonitorInfo(pMgmt, &qmInfo); qmGetMonitorInfo(pMgmt, &qmInfo);
dmGetMonitorSystemInfo(&qmInfo.sys); dmGetMonitorSystemInfo(&qmInfo.sys);
...@@ -37,17 +37,15 @@ int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) { ...@@ -37,17 +37,15 @@ int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) {
} }
tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo); tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo);
pReq->info.rsp = pRsp; pMsg->info.rsp = pRsp;
pReq->info.rspLen = rspLen; pMsg->info.rspLen = rspLen;
tFreeSMonQmInfo(&qmInfo); tFreeSMonQmInfo(&qmInfo);
return 0; return 0;
} }
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateQnodeReq createReq = {0}; SDCreateQnodeReq createReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
...@@ -68,10 +66,8 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -68,10 +66,8 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
} }
int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropQnodeReq dropReq = {0}; SDDropQnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
......
...@@ -40,7 +40,7 @@ typedef struct SSnodeMgmt { ...@@ -40,7 +40,7 @@ typedef struct SSnodeMgmt {
SArray *smGetMsgHandles(); SArray *smGetMsgHandles();
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
// smWorker.c // smWorker.c
int32_t smStartWorker(SSnodeMgmt *pMgmt); int32_t smStartWorker(SSnodeMgmt *pMgmt);
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {} void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) { int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonSmInfo smInfo = {0}; SMonSmInfo smInfo = {0};
smGetMonitorInfo(pMgmt, &smInfo); smGetMonitorInfo(pMgmt, &smInfo);
dmGetMonitorSystemInfo(&smInfo.sys); dmGetMonitorSystemInfo(&smInfo.sys);
...@@ -37,17 +37,15 @@ int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) { ...@@ -37,17 +37,15 @@ int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) {
} }
tSerializeSMonSmInfo(pRsp, rspLen, &smInfo); tSerializeSMonSmInfo(pRsp, rspLen, &smInfo);
pReq->info.rsp = pRsp; pMsg->info.rsp = pRsp;
pReq->info.rspLen = rspLen; pMsg->info.rspLen = rspLen;
tFreeSMonSmInfo(&smInfo); tFreeSMonSmInfo(&smInfo);
return 0; return 0;
} }
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateSnodeReq createReq = {0}; SDCreateSnodeReq createReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
...@@ -68,10 +66,8 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -68,10 +66,8 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
} }
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropSnodeReq dropReq = {0}; SDDropSnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
......
...@@ -84,10 +84,10 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); ...@@ -84,10 +84,10 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
// vmHandle.c // vmHandle.c
SArray *vmGetMsgHandles(); SArray *vmGetMsgHandles();
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
// vmFile.c // vmFile.c
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
......
...@@ -82,7 +82,7 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { ...@@ -82,7 +82,7 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
taosArrayDestroy(pVloads); taosArrayDestroy(pVloads);
} }
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) { int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonVmInfo vmInfo = {0}; SMonVmInfo vmInfo = {0};
vmGetMonitorInfo(pMgmt, &vmInfo); vmGetMonitorInfo(pMgmt, &vmInfo);
dmGetMonitorSystemInfo(&vmInfo.sys); dmGetMonitorSystemInfo(&vmInfo.sys);
...@@ -101,13 +101,13 @@ int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) { ...@@ -101,13 +101,13 @@ int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) {
} }
tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo); tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo);
pReq->info.rsp = pRsp; pMsg->info.rsp = pRsp;
pReq->info.rspLen = rspLen; pMsg->info.rspLen = rspLen;
tFreeSMonVmInfo(&vmInfo); tFreeSMonVmInfo(&vmInfo);
return 0; return 0;
} }
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) { int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonVloadInfo vloads = {0}; SMonVloadInfo vloads = {0};
vmGetVnodeLoads(pMgmt, &vloads); vmGetVnodeLoads(pMgmt, &vloads);
...@@ -124,8 +124,8 @@ int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) { ...@@ -124,8 +124,8 @@ int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) {
} }
tSerializeSMonVloadInfo(pRsp, rspLen, &vloads); tSerializeSMonVloadInfo(pRsp, rspLen, &vloads);
pReq->info.rsp = pRsp; pMsg->info.rsp = pRsp;
pReq->info.rspLen = rspLen; pMsg->info.rspLen = rspLen;
tFreeSMonVloadInfo(&vloads); tFreeSMonVloadInfo(&vloads);
return 0; return 0;
} }
...@@ -174,12 +174,11 @@ static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SW ...@@ -174,12 +174,11 @@ static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SW
} }
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SCreateVnodeReq createReq = {0}; SCreateVnodeReq createReq = {0};
int32_t code = -1; int32_t code = -1;
char path[TSDB_FILENAME_LEN] = {0}; char path[TSDB_FILENAME_LEN] = {0};
if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
...@@ -242,9 +241,8 @@ _OVER: ...@@ -242,9 +241,8 @@ _OVER:
} }
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDropVnodeReq dropReq = {0}; SDropVnodeReq dropReq = {0};
if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
......
...@@ -200,12 +200,12 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { ...@@ -200,12 +200,12 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
return 0; return 0;
} }
static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) { static void dmSendRpcRedirectRsp(const SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance(); SDnode *pDnode = dmInstance();
SEpSet epSet = {0}; SEpSet epSet = {0};
dmGetMnodeEpSet(&pDnode->data, &epSet); dmGetMnodeEpSet(&pDnode->data, &epSet);
dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse); dDebug("RPC %p, req is redirected, num:%d use:%d", pMsg->info.handle, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) { for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) { if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
...@@ -220,12 +220,14 @@ static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) { ...@@ -220,12 +220,14 @@ static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) {
SRpcMsg rsp = { SRpcMsg rsp = {
.code = TSDB_CODE_RPC_REDIRECT, .code = TSDB_CODE_RPC_REDIRECT,
.info = pReq->info, .info = pMsg->info,
.contLen = len, .contLen = len,
}; };
rsp.pCont = rpcMallocCont(len); rsp.pCont = rpcMallocCont(len);
tSerializeSMEpSet(rsp.pCont, len, &msg); tSerializeSMEpSet(rsp.pCont, len, &msg);
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont);
} }
static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) { static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
...@@ -239,16 +241,16 @@ static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) { ...@@ -239,16 +241,16 @@ static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
} }
} }
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pReq) { static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance(); SDnode *pDnode = dmInstance();
if (pDnode->status != DND_STAT_RUNNING) { if (pDnode->status != DND_STAT_RUNNING) {
rpcFreeCont(pReq->pCont); rpcFreeCont(pMsg->pCont);
pReq->pCont = NULL; pMsg->pCont = NULL;
terrno = TSDB_CODE_NODE_OFFLINE; terrno = TSDB_CODE_NODE_OFFLINE;
dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->info.handle); dError("failed to send rpc msg since %s, handle:%p", terrstr(), pMsg->info.handle);
return -1; return -1;
} else { } else {
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL); rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
return 0; return 0;
} }
} }
......
...@@ -624,14 +624,12 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { ...@@ -624,14 +624,12 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info}; SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info};
mInfo("dnode:%d, app:%p config:%s req send to dnode", cfgReq.dnodeId, rpcMsg.info.ahandle, cfgReq.config); mDebug("dnode:%d, send config req to dnode, app:%p", cfgReq.dnodeId, rpcMsg.info.ahandle);
tmsgSendReq(&epSet, &rpcMsg); return tmsgSendReq(&epSet, &rpcMsg);
return 0;
} }
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) { static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
mInfo("app:%p config rsp from dnode", pRsp->info.ahandle); mDebug("config rsp from dnode, app:%p", pRsp->info.ahandle);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -420,6 +420,11 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* ...@@ -420,6 +420,11 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
setQueryTimewindow(pReadHandle, pCond); setQueryTimewindow(pReadHandle, pCond);
if (pCond->numOfCols > 0) { if (pCond->numOfCols > 0) {
int32_t rowLen = 0;
for(int32_t i = 0; i < pCond->numOfCols; ++i) {
rowLen += pCond->colList[i].bytes;
}
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg)); pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
if (pReadHandle->suppInfo.pstatis == NULL) { if (pReadHandle->suppInfo.pstatis == NULL) {
......
...@@ -118,11 +118,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg ...@@ -118,11 +118,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
break; break;
} }
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
......
...@@ -14,8 +14,6 @@ ...@@ -14,8 +14,6 @@
*/ */
#include "executor.h" #include "executor.h"
#include <executorimpl.h>
#include <vnode.h>
#include "executorimpl.h" #include "executorimpl.h"
#include "planner.h" #include "planner.h"
#include "tdatablock.h" #include "tdatablock.h"
......
...@@ -586,8 +586,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow ...@@ -586,8 +586,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
int32_t numOfRows = pCtx[k].input.numOfRows; int32_t numOfRows = pCtx[k].input.numOfRows;
int32_t startOffset = pCtx[k].input.startRowIndex; int32_t startOffset = pCtx[k].input.startRowIndex;
int32_t pos = (order == TSDB_ORDER_ASC) ? offset : offset - (forwardStep - 1); pCtx[k].input.startRowIndex = offset;
pCtx[k].input.startRowIndex = pos;
pCtx[k].input.numOfRows = forwardStep; pCtx[k].input.numOfRows = forwardStep;
if (tsCol != NULL) { if (tsCol != NULL) {
...@@ -758,45 +757,6 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt ...@@ -758,45 +757,6 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
} }
} }
} }
// setBlockStatisInfo(&pCtx[i], pBlock, pOperator->pExpr[i].base.pColumns);
// uint32_t flag = pOperator->pExpr[i].base.pParam[0].pCol->flag;
// if (TSDB_COL_IS_NORMAL_COL(flag) /*|| (pCtx[i].functionId == FUNCTION_BLKINFO) ||
// (TSDB_COL_IS_TAG(flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)*/) {
// SColumn* pCol = pOperator->pExpr[i].base.pParam[0].pCol;
// if (pCtx[i].columnIndex == -1) {
// for(int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
// SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
// if (pColData->info.colId == pCol->colId) {
// pCtx[i].columnIndex = j;
// break;
// }
// }
// }
// uint32_t status = aAggs[pCtx[i].functionId].status;
// if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) {
// SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0);
// In case of the top/bottom query again the nest query result, which has no timestamp column
// don't set the ptsList attribute.
// if (tsInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
// pCtx[i].ptsList = (int64_t*) tsInfo->pData;
// } else {
// pCtx[i].ptsList = NULL;
// }
// }
// } else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) {
// SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
// SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex);
//
// pCtx[i].pInput = p->pData;
// assert(p->info.colId == pColIndex->info.colId && pCtx[i].inputType == p->info.type);
// for(int32_t j = 0; j < pBlock->info.rows; ++j) {
// char* dst = p->pData + j * p->info.bytes;
// taosVariantDump(&pOperator->pExpr[i].base.param[1], dst, p->info.type, true);
// }
// }
} }
return code; return code;
...@@ -2842,9 +2802,11 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI ...@@ -2842,9 +2802,11 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData idata = {0}; SColumnInfoData idata = {0};
idata.info.type = pSchema[i].type; idata.info.type = pSchema[i].type;
idata.info.bytes = pSchema[i].bytes; idata.info.bytes = pSchema[i].bytes;
idata.info.colId = pSchema[i].colId; idata.info.colId = pSchema[i].colId;
idata.hasNull = true;
taosArrayPush(pBlock->pDataBlock, &idata); taosArrayPush(pBlock->pDataBlock, &idata);
if (IS_VAR_DATA_TYPE(idata.info.type)) { if (IS_VAR_DATA_TYPE(idata.info.type)) {
......
...@@ -898,11 +898,11 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -898,11 +898,11 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan
if (upRes) { if (upRes) {
pInfo->pUpdateRes = upRes; pInfo->pUpdateRes = upRes;
if (upRes->info.type = STREAM_REPROCESS) { if (upRes->info.type == STREAM_REPROCESS) {
pInfo->updateResIndex = 0; pInfo->updateResIndex = 0;
prepareDataScan(pInfo); prepareDataScan(pInfo);
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES; pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
} else if (upRes->info.type = STREAM_INVERT) { } else if (upRes->info.type == STREAM_INVERT) {
pInfo->scanMode = STREAM_SCAN_FROM_RES; pInfo->scanMode = STREAM_SCAN_FROM_RES;
return upRes; return upRes;
} }
......
...@@ -1072,7 +1072,9 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, ...@@ -1072,7 +1072,9 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo,
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SIntervalAggOperatorInfo* pInfo = pOperator->info; SIntervalAggOperatorInfo* pInfo = pOperator->info;
int32_t order = TSDB_ORDER_ASC; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
pInfo->order = TSDB_ORDER_ASC;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
...@@ -1086,11 +1088,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1086,11 +1088,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
} }
// STimeWindow win = {0};
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = NULL; SArray* pUpdated = NULL;
while (1) { while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
...@@ -1103,16 +1103,18 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1103,16 +1103,18 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
// The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the // The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the
// caller. Note that all the time window are not close till now. // caller. Note that all the time window are not close till now.
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
if (pInfo->invertible) { if (pInfo->invertible) {
setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type); setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type);
} }
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_REPROCESS) {
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval,
pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock); pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock);
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
continue; continue;
} }
pInfo->order = TSDB_ORDER_ASC;
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
} }
......
...@@ -207,6 +207,9 @@ bool fmIsInvertible(int32_t funcId) { ...@@ -207,6 +207,9 @@ bool fmIsInvertible(int32_t funcId) {
case FUNCTION_TYPE_SUM: case FUNCTION_TYPE_SUM:
case FUNCTION_TYPE_STDDEV: case FUNCTION_TYPE_STDDEV:
case FUNCTION_TYPE_AVG: case FUNCTION_TYPE_AVG:
case FUNCTION_TYPE_WSTARTTS:
case FUNCTION_TYPE_WENDTS:
case FUNCTION_TYPE_WDURATION:
res = true; res = true;
break; break;
default: default:
......
...@@ -64,7 +64,11 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInte ...@@ -64,7 +64,11 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInte
*(double*)(newInterBuf->buf) = sumSquares; *(double*)(newInterBuf->buf) = sumSquares;
newInterBuf->bufLen = sizeof(double); newInterBuf->bufLen = sizeof(double);
} }
newInterBuf->numOfResult = numOutput; if (interBuf->numOfResult == 0 && numOutput == 0) {
newInterBuf->numOfResult = 0;
} else {
newInterBuf->numOfResult = 1;
}
return 0; return 0;
} }
......
...@@ -137,7 +137,11 @@ endi ...@@ -137,7 +137,11 @@ endi
sql insert into t1 values(1648791223001,12,14,13,11.1); sql insert into t1 values(1648791223001,12,14,13,11.1);
sleep 500 sleep 500
sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; sql select * from streamt;
print count(*) , count(d) , sum(a) , max(b) , min(c)
print 0: $data00 , $data01 , $data02 , $data03 , $data04 , $data05
print 1: $data10 , $data11 , $data12 , $data13 , $data14 , $data15
if $rows != 4 then if $rows != 4 then
print ======$rows print ======$rows
......
...@@ -354,7 +354,7 @@ class TDTestCase: ...@@ -354,7 +354,7 @@ class TDTestCase:
event.wait() event.wait()
tdLog.info("start consume processor") tdLog.info("start consume processor")
pollDelay = 5 pollDelay = 15
showMsg = 1 showMsg = 1
showRow = 1 showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册