未验证 提交 0a168cba 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #12768 from taosdata/feature/tq

refactor(stream)
...@@ -2589,18 +2589,6 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) { ...@@ -2589,18 +2589,6 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp); taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp);
} }
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t sourceVg;
int64_t sourceVer;
SArray* data; // SArray<SSDataBlock>
} SStreamDispatchReq;
typedef struct {
int8_t inputStatus;
} SStreamDispatchRsp;
#define TD_AUTO_CREATE_TABLE 0x1 #define TD_AUTO_CREATE_TABLE 0x1
typedef struct { typedef struct {
int64_t suid; int64_t suid;
......
...@@ -200,6 +200,10 @@ enum { ...@@ -200,6 +200,10 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
......
...@@ -107,7 +107,7 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) ...@@ -107,7 +107,7 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit)
if (ref == 0) { if (ref == 0) {
taosMemoryFree(pDataSubmit->data); taosMemoryFree(pDataSubmit->data);
taosMemoryFree(pDataSubmit->dataRef); taosMemoryFree(pDataSubmit->dataRef);
taosFreeQitem(pDataSubmit); // taosFreeQitem(pDataSubmit);
} }
} }
...@@ -286,6 +286,36 @@ typedef struct { ...@@ -286,6 +286,36 @@ typedef struct {
int32_t taskId; int32_t taskId;
} SStreamTaskRunReq; } SStreamTaskRunReq;
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t sourceTaskId;
int32_t sourceVg;
#if 0
int64_t sourceVer;
#endif
SArray* data; // SArray<SSDataBlock>
} SStreamDispatchReq;
typedef struct {
int64_t streamId;
int32_t taskId;
int8_t inputStatus;
} SStreamDispatchRsp;
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t sourceTaskId;
int32_t sourceVg;
} SStreamTaskRecoverReq;
typedef struct {
int64_t streamId;
int32_t taskId;
int8_t inputStatus;
} SStreamTaskRecoverRsp;
int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input); int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input);
int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input); int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input);
int32_t streamDequeueOutput(SStreamTask* pTask, void** output); int32_t streamDequeueOutput(SStreamTask* pTask, void** output);
...@@ -296,6 +326,12 @@ int32_t streamTaskRun(SStreamTask* pTask); ...@@ -296,6 +326,12 @@ int32_t streamTaskRun(SStreamTask* pTask);
int32_t streamTaskHandleInput(SStreamTask* pTask, void* data); int32_t streamTaskHandleInput(SStreamTask* pTask, void* data);
int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
int32_t streamTaskProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp);
int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
int32_t streamTaskProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -314,6 +314,10 @@ SArray *vmGetMsgHandles() { ...@@ -314,6 +314,10 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_MERGE_EXEC, vmPutNodeMsgToMergeQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_MERGE_EXEC, vmPutNodeMsgToMergeQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_WRITE_EXEC, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_WRITE_EXEC, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
......
...@@ -121,10 +121,18 @@ int tqCommit(STQ*); ...@@ -121,10 +121,18 @@ int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
#if 0
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId); int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); #endif
int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* data);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
// sma // sma
int32_t smaOpen(SVnode* pVnode); int32_t smaOpen(SVnode* pVnode);
......
...@@ -105,12 +105,11 @@ static void tdSRowDemo() { ...@@ -105,12 +105,11 @@ static void tdSRowDemo() {
} }
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
void* pIter = NULL; void* pIter = NULL;
STqExec* pExec = NULL;
while (1) { while (1) {
pIter = taosHashIterate(pTq->execs, pIter); pIter = taosHashIterate(pTq->execs, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
pExec = (STqExec*)pIter; STqExec* pExec = (STqExec*)pIter;
if (pExec->subType == TOPIC_SUB_TYPE__DB) { if (pExec->subType == TOPIC_SUB_TYPE__DB) {
if (!isAdd) { if (!isAdd) {
int32_t sz = taosArrayGetSize(tbUidList); int32_t sz = taosArrayGetSize(tbUidList);
...@@ -274,6 +273,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -274,6 +273,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
} }
memcpy(data, msg, msgLen); memcpy(data, msg, msgLen);
tqProcessStreamTriggerNew(pTq, data);
#if 0
SRpcMsg req = { SRpcMsg req = {
.msgType = TDMT_VND_STREAM_TRIGGER, .msgType = TDMT_VND_STREAM_TRIGGER,
.pCont = data, .pCont = data,
...@@ -281,6 +283,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -281,6 +283,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
}; };
tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req); tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req);
#endif
return 0; return 0;
} }
...@@ -975,12 +978,24 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ...@@ -975,12 +978,24 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
} }
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
pTask->status = TASK_STATUS__IDLE;
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
pTask->inputQ = taosOpenQueue();
pTask->outputQ = taosOpenQueue();
pTask->inputQAll = taosAllocateQall();
pTask->outputQAll = taosAllocateQall();
if (pTask->inputQ == NULL || pTask->outputQ == NULL || pTask->inputQAll == NULL || pTask->outputQAll == NULL)
goto FAIL;
if (pTask->execType != TASK_EXEC__NONE) { if (pTask->execType != TASK_EXEC__NONE) {
// expand runners // expand runners
pTask->exec.numOfRunners = parallel; pTask->exec.numOfRunners = parallel;
pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner)); pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner));
if (pTask->exec.runners == NULL) { if (pTask->exec.runners == NULL) {
return -1; goto FAIL;
} }
for (int32_t i = 0; i < parallel; i++) { for (int32_t i = 0; i < parallel; i++) {
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
...@@ -1002,6 +1017,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { ...@@ -1002,6 +1017,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
} }
return 0; return 0;
FAIL:
if (pTask->inputQ) taosCloseQueue(pTask->inputQ);
if (pTask->outputQ) taosCloseQueue(pTask->outputQ);
if (pTask->inputQAll) taosFreeQall(pTask->inputQAll);
if (pTask->outputQAll) taosFreeQall(pTask->outputQAll);
if (pTask) taosMemoryFree(pTask);
return -1;
} }
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
...@@ -1053,6 +1075,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t wo ...@@ -1053,6 +1075,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t wo
return 0; return 0;
} }
#if 0
int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* data) { int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* data) {
SStreamDataSubmit* pSubmit = NULL; SStreamDataSubmit* pSubmit = NULL;
...@@ -1103,6 +1126,7 @@ FAIL: ...@@ -1103,6 +1126,7 @@ FAIL:
} }
return -1; return -1;
} }
#endif
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) { int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) {
SStreamTaskExecReq req; SStreamTaskExecReq req;
...@@ -1120,25 +1144,28 @@ int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) ...@@ -1120,25 +1144,28 @@ int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId)
return 0; return 0;
} }
int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) { int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* pReq) {
void* pIter = NULL; void* pIter = NULL;
bool failed = false; bool failed = false;
SStreamDataSubmit* pSubmit = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); SStreamDataSubmit* pSubmit = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
if (pSubmit == NULL) { if (pSubmit == NULL) {
failed = true; failed = true;
goto SET_TASK_FAIL;
} }
pSubmit->dataRef = taosMemoryMalloc(sizeof(int32_t)); pSubmit->dataRef = taosMemoryMalloc(sizeof(int32_t));
if (pSubmit->dataRef == NULL) { if (pSubmit->dataRef == NULL) {
failed = true; failed = true;
goto SET_TASK_FAIL;
} }
pSubmit->type = STREAM_DATA_TYPE_SUBMIT_BLOCK; pSubmit->type = STREAM_INPUT__DATA_SUBMIT;
pSubmit->sourceVer = ver; /*pSubmit->sourceVer = ver;*/
pSubmit->sourceVg = pTq->pVnode->config.vgId; /*pSubmit->sourceVg = pTq->pVnode->config.vgId;*/
pSubmit->data = pReq; pSubmit->data = pReq;
*pSubmit->dataRef = 1; *pSubmit->dataRef = 1;
SET_TASK_FAIL:
while (1) { while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter); pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
...@@ -1157,7 +1184,18 @@ int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) { ...@@ -1157,7 +1184,18 @@ int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
int8_t execStatus = atomic_load_8(&pTask->status); int8_t execStatus = atomic_load_8(&pTask->status);
if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) { if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) {
// TODO dispatch task launch msg to fetch queue SStreamTaskRunReq* pRunReq = taosMemoryMalloc(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) continue;
// TODO: do we need htonl?
pRunReq->head.vgId = pTq->pVnode->config.vgId;
pRunReq->streamId = pTask->streamId;
pRunReq->taskId = pTask->taskId;
SRpcMsg msg = {
.msgType = TDMT_VND_TASK_RUN,
.pCont = pRunReq,
.contLen = sizeof(SStreamTaskRunReq),
};
tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &msg);
} }
} else { } else {
...@@ -1169,11 +1207,53 @@ int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) { ...@@ -1169,11 +1207,53 @@ int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
streamDataSubmitRefDec(pSubmit); streamDataSubmitRefDec(pSubmit);
return 0; return 0;
} else { } else {
if (pSubmit) {
if (pSubmit->dataRef) {
taosMemoryFree(pSubmit->dataRef);
}
taosFreeQitem(pSubmit);
}
return -1; return -1;
} }
} }
int32_t tqProcessTaskExec2(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
// //
SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
streamTaskProcessRunReq(pTask, &pTq->pVnode->msgCb);
return 0;
}
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
streamTaskProcessDispatchReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
return 0;
}
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
streamTaskProcessRecoverReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
return 0;
}
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->taskId;
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
streamTaskProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp);
return 0;
}
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->taskId;
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
streamTaskProcessRecoverRsp(pTask, pRsp);
return 0; return 0;
} }
...@@ -106,11 +106,13 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg ...@@ -106,11 +106,13 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
pMsg->contLen - sizeof(SMsgHead)) < 0) { pMsg->contLen - sizeof(SMsgHead)) < 0) {
} }
} break; } break;
#if 0
case TDMT_VND_TASK_WRITE_EXEC: { case TDMT_VND_TASK_WRITE_EXEC: {
if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead), if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
0) < 0) { 0) < 0) {
} }
} break; } break;
#endif
case TDMT_VND_ALTER_VNODE: case TDMT_VND_ALTER_VNODE:
break; break;
default: default:
...@@ -162,7 +164,7 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -162,7 +164,7 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("message in fetch queue is processing"); vTrace("message in fetch queue is processing");
char * msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_FETCH: case TDMT_VND_FETCH:
...@@ -181,15 +183,32 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -181,15 +183,32 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return vnodeGetTableMeta(pVnode, pMsg); return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId); return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
case TDMT_VND_TASK_RUN: {
int32_t code = tqProcessTaskRunReq(pVnode->pTq, pMsg);
pMsg->pCont = NULL;
return code;
}
case TDMT_VND_TASK_DISPATCH:
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_RECOVER:
return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_DISPATCH_RSP:
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_VND_TASK_RECOVER_RSP:
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
#if 0
case TDMT_VND_TASK_PIPE_EXEC: case TDMT_VND_TASK_PIPE_EXEC:
case TDMT_VND_TASK_MERGE_EXEC: case TDMT_VND_TASK_MERGE_EXEC:
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0); return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);
case TDMT_VND_STREAM_TRIGGER: { case TDMT_VND_STREAM_TRIGGER:{
// refactor, avoid double free // refactor, avoid double free
int code = tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0); int code = tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
pMsg->pCont = NULL; pMsg->pCont = NULL;
return code; return code;
} }
#endif
case TDMT_VND_QUERY_HEARTBEAT: case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
default: default:
...@@ -332,12 +351,12 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -332,12 +351,12 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
SDecoder decoder = {0}; SDecoder decoder = {0};
int rcode = 0; int rcode = 0;
SVCreateTbBatchReq req = {0}; SVCreateTbBatchReq req = {0};
SVCreateTbReq * pCreateReq; SVCreateTbReq *pCreateReq;
SVCreateTbBatchRsp rsp = {0}; SVCreateTbBatchRsp rsp = {0};
SVCreateTbRsp cRsp = {0}; SVCreateTbRsp cRsp = {0};
char tbName[TSDB_TABLE_FNAME_LEN]; char tbName[TSDB_TABLE_FNAME_LEN];
STbUidStore * pStore = NULL; STbUidStore *pStore = NULL;
SArray * tbUids = NULL; SArray *tbUids = NULL;
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP; pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
pRsp->code = TSDB_CODE_SUCCESS; pRsp->code = TSDB_CODE_SUCCESS;
...@@ -521,7 +540,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -521,7 +540,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
SDecoder decoder = {0}; SDecoder decoder = {0};
SEncoder encoder = {0}; SEncoder encoder = {0};
int ret; int ret;
SArray * tbUids = NULL; SArray *tbUids = NULL;
pRsp->msgType = TDMT_VND_DROP_TABLE_RSP; pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
pRsp->pCont = NULL; pRsp->pCont = NULL;
...@@ -576,9 +595,9 @@ _exit: ...@@ -576,9 +595,9 @@ _exit:
static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) { static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) {
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
STSchema * pSchema = NULL; STSchema *pSchema = NULL;
tb_uid_t suid = 0; tb_uid_t suid = 0;
STSRow * row = NULL; STSRow *row = NULL;
tInitSubmitBlkIter(msgIter, pBlock, &blkIter); tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
if (blkIter.row == NULL) return 0; if (blkIter.row == NULL) return 0;
...@@ -609,8 +628,8 @@ static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSub ...@@ -609,8 +628,8 @@ static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSub
static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) { static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
ASSERT(pMsg != NULL); ASSERT(pMsg != NULL);
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};
SMeta * pMeta = pVnode->pMeta; SMeta *pMeta = pVnode->pMeta;
SSubmitBlk * pBlock = NULL; SSubmitBlk *pBlock = NULL;
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) { while (true) {
...@@ -624,10 +643,10 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char ...@@ -624,10 +643,10 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char
} }
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SSubmitReq * pSubmitReq = (SSubmitReq *)pReq; SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
SSubmitRsp submitRsp = {0}; SSubmitRsp submitRsp = {0};
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};
SSubmitBlk * pBlock; SSubmitBlk *pBlock;
SSubmitRsp rsp = {0}; SSubmitRsp rsp = {0};
SVCreateTbReq createTbReq = {0}; SVCreateTbReq createTbReq = {0};
SDecoder decoder = {0}; SDecoder decoder = {0};
......
...@@ -68,7 +68,7 @@ static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMs ...@@ -68,7 +68,7 @@ static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMs
// get groupId, compute hash value // get groupId, compute hash value
uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName)); uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));
//
// get node // get node
// TODO: optimize search process // TODO: optimize search process
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
...@@ -152,13 +152,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ...@@ -152,13 +152,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
// exec // exec
while (1) { while (1) {
SSDataBlock* output; SSDataBlock* output = NULL;
uint64_t ts = 0; uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) { if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(false); ASSERT(false);
} }
if (output == NULL) break; if (output == NULL) break;
taosArrayPush(pRes, &output); taosArrayPush(pRes, output);
} }
// destroy // destroy
...@@ -189,7 +189,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) { ...@@ -189,7 +189,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) {
taosFreeQitem(data); taosFreeQitem(data);
if (taosArrayGetSize(pRes) != 0) { if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM); SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
resQ->type = STREAM_INPUT__DATA_BLOCK; resQ->type = STREAM_INPUT__DATA_BLOCK;
resQ->blocks = pRes; resQ->blocks = pRes;
taosWriteQitem(pTask->outputQ, resQ); taosWriteQitem(pTask->outputQ, resQ);
...@@ -209,7 +209,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) { ...@@ -209,7 +209,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) {
taosFreeQitem(data); taosFreeQitem(data);
if (taosArrayGetSize(pRes) != 0) { if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM); SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
resQ->type = STREAM_INPUT__DATA_BLOCK; resQ->type = STREAM_INPUT__DATA_BLOCK;
resQ->blocks = pRes; resQ->blocks = pRes;
taosWriteQitem(pTask->outputQ, resQ); taosWriteQitem(pTask->outputQ, resQ);
...@@ -231,7 +231,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) { ...@@ -231,7 +231,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) {
taosFreeQitem(data); taosFreeQitem(data);
if (taosArrayGetSize(pRes) != 0) { if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM); SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
resQ->type = STREAM_INPUT__DATA_BLOCK; resQ->type = STREAM_INPUT__DATA_BLOCK;
resQ->blocks = pRes; resQ->blocks = pRes;
taosWriteQitem(pTask->outputQ, resQ); taosWriteQitem(pTask->outputQ, resQ);
...@@ -253,7 +253,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) { ...@@ -253,7 +253,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) {
taosFreeQitem(data); taosFreeQitem(data);
if (taosArrayGetSize(pRes) != 0) { if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM); SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
resQ->type = STREAM_INPUT__DATA_BLOCK; resQ->type = STREAM_INPUT__DATA_BLOCK;
resQ->blocks = pRes; resQ->blocks = pRes;
taosWriteQitem(pTask->outputQ, resQ); taosWriteQitem(pTask->outputQ, resQ);
...@@ -392,12 +392,14 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* ...@@ -392,12 +392,14 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
// 1.2 enqueue // 1.2 enqueue
pBlock->type = STREAM_DATA_TYPE_SSDATA_BLOCK; pBlock->type = STREAM_DATA_TYPE_SSDATA_BLOCK;
pBlock->sourceVg = pReq->sourceVg; pBlock->sourceVg = pReq->sourceVg;
pBlock->sourceVer = pReq->sourceVer; /*pBlock->sourceVer = pReq->sourceVer;*/
taosWriteQitem(pTask->inputQ, pBlock); taosWriteQitem(pTask->inputQ, pBlock);
// 1.3 rsp by input status // 1.3 rsp by input status
SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp)); SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp));
pCont->inputStatus = status; pCont->inputStatus = status;
pCont->streamId = pReq->streamId;
pCont->taskId = pReq->sourceTaskId;
pRsp->pCont = pCont; pRsp->pCont = pCont;
pRsp->contLen = sizeof(SStreamDispatchRsp); pRsp->contLen = sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp); tmsgSendRsp(pRsp);
...@@ -439,12 +441,12 @@ int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) { ...@@ -439,12 +441,12 @@ int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) {
return 0; return 0;
} }
int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, char* msg) { int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) {
// //
return 0; return 0;
} }
int32_t streamTaskProcessRecoverRsp(SStreamTask* pTask, char* msg) { int32_t streamTaskProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) {
// //
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册