未验证 提交 cfd08b16 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #10781 from taosdata/feature/tq

task deploy and task exec
......@@ -2291,6 +2291,10 @@ typedef struct {
// TODO: other info needed by task
} SStreamTaskExecReq;
typedef struct {
int32_t reserved;
} SStreamTaskExecRsp;
#pragma pack(pop)
#ifdef __cplusplus
......
......@@ -200,6 +200,7 @@ enum {
// Requests handled by SNODE
TD_NEW_MSG_SEG(TDMT_SND_MSG)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_EXEC, "snode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
#if defined(TD_MSG_NUMBER_)
TDMT_MAX
......
......@@ -402,6 +402,29 @@ static int32_t tDecodeCStrTo(SCoder* pDecoder, char* val) {
return 0;
}
static FORCE_INLINE int32_t tDecodeBinaryAlloc(SCoder* pDecoder, void** val, uint64_t* len) {
if (tDecodeU64v(pDecoder, len) < 0) return -1;
if (TD_CODER_CHECK_CAPACITY_FAILED(pDecoder, *len)) return -1;
*val = malloc(*len);
if (*val == NULL) return -1;
memcpy(*val, TD_CODER_CURRENT(pDecoder), *len);
TD_CODER_MOVE_POS(pDecoder, *len);
return 0;
}
static FORCE_INLINE int32_t tDecodeCStrAndLenAlloc(SCoder* pDecoder, char** val, uint64_t* len) {
if (tDecodeBinaryAlloc(pDecoder, (void**)val, len) < 0) return -1;
(*len) -= 1;
return 0;
}
static FORCE_INLINE int32_t tDecodeCStrAlloc(SCoder* pDecoder, char** val) {
uint64_t len;
return tDecodeCStrAndLenAlloc(pDecoder, val, &len);
}
static FORCE_INLINE bool tDecodeIsEnd(SCoder* pCoder) { return (pCoder->size == pCoder->pos); }
#ifdef __cplusplus
......
......@@ -2674,9 +2674,9 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
if (tDecodeCStr(&decoder, (const char **)&pReq->sql) < 0) return -1;
if (tDecodeCStr(&decoder, (const char **)&pReq->physicalPlan) < 0) return -1;
if (tDecodeCStr(&decoder, (const char **)&pReq->logicalPlan) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pReq->physicalPlan) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pReq->logicalPlan) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
......@@ -2706,7 +2706,7 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) {
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
if (tDecodeCStr(pDecoder, (const char **)&pTask->qmsg) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
......
......@@ -39,8 +39,8 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
if (tDecodeCStr(pDecoder, (const char **)&pObj->sql) < 0) return -1;
if (tDecodeCStr(pDecoder, (const char **)&pObj->logicalPlan) < 0) return -1;
if (tDecodeCStr(pDecoder, (const char **)&pObj->physicalPlan) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
return 0;
}
......@@ -51,6 +51,7 @@ SStreamMeta* sndMetaNew();
void sndMetaDelete(SStreamMeta* pMeta);
int32_t sndMetaDeployTask(SStreamMeta* pMeta, SStreamTask* pTask);
SStreamTask* sndMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
int32_t sndMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t sndDropTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
......
......@@ -68,6 +68,10 @@ int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) {
return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *));
}
SStreamTask *sndMetaGetTask(SStreamMeta *pMeta, int32_t taskId) {
return taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
}
int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) {
SStreamTask *pTask = taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
if (pTask == NULL) {
......@@ -79,6 +83,16 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) {
return taosHashRemove(pMeta->pHash, &taskId, sizeof(int32_t));
}
static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) {
SMsgHead *pHead = pMsg->pCont;
int32_t taskId = pHead->streamTaskId;
SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId);
if (pTask == NULL) {
return -1;
}
return 0;
}
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
// stream deploy
// stream stop/resume
......@@ -95,13 +109,20 @@ int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
tCoderClear(&decoder);
sndMetaDeployTask(pSnode->pMeta, pTask);
} else if (pMsg->msgType == TDMT_SND_TASK_EXEC) {
sndProcessTaskExecReq(pSnode, pMsg);
} else {
//
ASSERT(0);
}
return 0;
}
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
// operator exec
if (pMsg->msgType == TDMT_SND_TASK_EXEC) {
sndProcessTaskExecReq(pSnode, pMsg);
} else {
ASSERT(0);
}
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册