From 70cddceb605eb9efe947d2ca9bc8a5bb146f34ad Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 21 Mar 2022 16:35:27 +0800 Subject: [PATCH] assign task to vg --- include/common/tmsg.h | 16 +- source/client/src/tmq.c | 93 +++++++++++- source/common/src/tmsg.c | 39 ++++- source/dnode/mnode/impl/inc/mndVgroup.h | 2 +- source/dnode/mnode/impl/src/mndScheduler.c | 165 ++++++++------------- source/dnode/mnode/impl/src/mndStream.c | 4 +- source/dnode/mnode/impl/src/mndVgroup.c | 6 +- source/dnode/snode/src/snode.c | 2 +- 8 files changed, 203 insertions(+), 124 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f148009f81..852f77777a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1129,10 +1129,10 @@ typedef struct { typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; + char outputTbName[TSDB_TABLE_NAME_LEN]; int8_t igExists; char* sql; - char* physicalPlan; - char* logicalPlan; + char* ast; } SCMCreateStreamReq; typedef struct { @@ -2273,13 +2273,23 @@ enum { STREAM_TASK_STATUS__STOP, }; +typedef struct { + void* inputHandle; + void** executor; +} SStreamTaskParRunner; + typedef struct { int64_t streamId; int32_t taskId; int32_t level; int8_t status; + int8_t pipeEnd; + int8_t parallel; + SEpSet NextOpEp; char* qmsg; - void* executor; + // not applied to encoder and decoder + SStreamTaskParRunner runner; + // void* executor; // void* stateStore; // storage handle } SStreamTask; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index dfb0a8fcf5..a8fc394720 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -456,6 +456,94 @@ _return: void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; } +TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) { + STscObj* pTscObj = (STscObj*)taos; + SRequestObj* pRequest = NULL; + SQuery* pQueryNode = NULL; + char* astStr = NULL; + int32_t sqlLen; + + terrno = TSDB_CODE_SUCCESS; + if (taos == NULL || streamName == NULL || sql == NULL) { + tscError("invalid parameters for creating stream, connObj:%p, stream name:%s, sql:%s", taos, streamName, sql); + terrno = TSDB_CODE_TSC_INVALID_INPUT; + goto _return; + } + sqlLen = strlen(sql); + + if (strlen(streamName) >= TSDB_TABLE_NAME_LEN) { + tscError("stream name too long, max length:%d", TSDB_TABLE_NAME_LEN - 1); + terrno = TSDB_CODE_TSC_INVALID_INPUT; + goto _return; + } + + if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) { + tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); + terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; + goto _return; + } + + tscDebug("start to create stream: %s", streamName); + + CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); + CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode), _return); + + // todo check for invalid sql statement and return with error code + + CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return); + + /*printf("%s\n", pStr);*/ + + SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T}; + strcpy(name.dbname, pRequest->pDb); + strcpy(name.tname, streamName); + + SCMCreateStreamReq req = { + .igExists = 1, + .ast = astStr, + .sql = (char*)sql, + }; + tNameExtractFullName(&name, req.name); + strcpy(req.outputTbName, tbName); + + int tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req); + void* buf = malloc(tlen); + if (buf == NULL) { + goto _return; + } + + tSerializeSCMCreateStreamReq(buf, tlen, &req); + /*printf("formatted: %s\n", dagStr);*/ + + pRequest->body.requestMsg = (SDataBuf){ + .pData = buf, + .len = tlen, + .handle = NULL, + }; + pRequest->type = TDMT_MND_CREATE_STREAM; + + SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); + + int64_t transporterId = 0; + asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + + tsem_wait(&pRequest->body.rspSem); + +_return: + tfree(astStr); + qDestroyQuery(pQueryNode); + /*if (sendInfo != NULL) {*/ + /*destroySendMsgInfo(sendInfo);*/ + /*}*/ + + if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) { + pRequest->code = terrno; + } + + return pRequest; +} + TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) { STscObj* pTscObj = (STscObj*)taos; SRequestObj* pRequest = NULL; @@ -481,7 +569,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i goto _return; } - tscDebug("start to create topic, %s", topicName); + tscDebug("start to create topic: %s", topicName); CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); CHECK_CODE_GOTO(parseSql(pRequest, true, &pQueryNode), _return); @@ -498,7 +586,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i SCMCreateTopicReq req = { .igExists = 1, - .ast = (char*)astStr, + .ast = astStr, .sql = (char*)sql, }; tNameExtractFullName(&name, req.name); @@ -528,6 +616,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i tsem_wait(&pRequest->body.rspSem); _return: + tfree(astStr); qDestroyQuery(pQueryNode); /*if (sendInfo != NULL) {*/ /*destroySendMsgInfo(sendInfo);*/ diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9472b19cca..1d2c9397c0 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2469,7 +2469,7 @@ int32_t tEncodeSMqCMCommitOffsetReq(SCoder *encoder, const SMqCMCommitOffsetReq int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq) { if (tStartDecode(decoder) < 0) return -1; if (tDecodeI32(decoder, &pReq->num) < 0) return -1; - TCODER_MALLOC(pReq->offsets, SMqOffset*, pReq->num * sizeof(SMqOffset), decoder); + TCODER_MALLOC(pReq->offsets, SMqOffset *, pReq->num * sizeof(SMqOffset), decoder); if (pReq->offsets == NULL) return -1; for (int32_t i = 0; i < pReq->num; i++) { tDecodeSMqOffset(decoder, &pReq->offsets[i]); @@ -2653,15 +2653,22 @@ void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) { } int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) { + int32_t sqlLen = 0; + int32_t astLen = 0; + if (pReq->sql != NULL) sqlLen = (int32_t)strlen(pReq->sql); + if (pReq->ast != NULL) astLen = (int32_t)strlen(pReq->ast); + SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); if (tStartEncode(&encoder) < 0) return -1; if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->outputTbName) < 0) return -1; if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->physicalPlan) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->logicalPlan) < 0) return -1; + if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1; + if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1; + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -2670,15 +2677,30 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS } int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) { + int32_t sqlLen = 0; + int32_t astLen = 0; + SCoder decoder = {0}; tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); if (tStartDecode(&decoder) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->outputTbName) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; - if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1; - if (tDecodeCStrAlloc(&decoder, &pReq->physicalPlan) < 0) return -1; - if (tDecodeCStrAlloc(&decoder, &pReq->logicalPlan) < 0) return -1; + if (tDecodeI32(&decoder, &sqlLen) < 0) return -1; + if (tDecodeI32(&decoder, &astLen) < 0) return -1; + + if (sqlLen > 0) { + pReq->sql = calloc(1, sqlLen + 1); + if (pReq->sql == NULL) return -1; + if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1; + } + + if (astLen > 0) { + pReq->ast = calloc(1, astLen + 1); + if (pReq->ast == NULL) return -1; + if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1; + } tEndDecode(&decoder); tCoderClear(&decoder); @@ -2687,8 +2709,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { tfree(pReq->sql); - tfree(pReq->physicalPlan); - tfree(pReq->logicalPlan); + tfree(pReq->ast); } int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) { @@ -2697,6 +2718,7 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) { if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->level) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; @@ -2708,6 +2730,7 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) { if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1; tEndDecode(pDecoder); return 0; diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index 85e9a15bd4..f42829eddf 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -28,7 +28,7 @@ SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId); void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup); SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup); int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); -SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup); +SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup); int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId); void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index faef757e76..c28c0d76c4 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -31,6 +31,53 @@ #include "tname.h" #include "tuuid.h" +int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet) { + SCoder encoder; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); + tEncodeSStreamTask(&encoder, pTask); + int32_t tlen = sizeof(SMsgHead) + encoder.pos; + tCoderClear(&encoder); + void* buf = malloc(tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + ((SMsgHead*)buf)->streamTaskId = pTask->taskId; + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER); + tEncodeSStreamTask(&encoder, pTask); + tCoderClear(&encoder); + + STransAction action = {0}; + memcpy(&action.epSet, pEpSet, sizeof(SEpSet)); + action.pCont = buf; + action.contLen = tlen; + action.msgType = TDMT_SND_TASK_DEPLOY; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + rpcFreeCont(buf); + return -1; + } + return 0; +} + +int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) { + int32_t msgLen; + plan->execNode.nodeId = pVgroup->vgId; + plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup); + + if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet); + return 0; +} + +int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, + const SSnodeObj* pSnode) { + return 0; +} + int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SSdb* pSdb = pMnode->pSdb; SVgObj* pVgroup = NULL; @@ -44,7 +91,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { int32_t totLevel = LIST_LENGTH(pPlan->pSubplans); pStream->tasks = taosArrayInit(totLevel, sizeof(SArray)); - int32_t msgLen; for (int32_t level = 0; level < totLevel; level++) { SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask)); SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level); @@ -67,43 +113,16 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // send to vnode SStreamTask* pTask = streamTaskNew(pStream->uid, level); - - plan->execNode.nodeId = pVgroup->vgId; - plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup); - if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) { + // TODO: set to + pTask->parallel = 4; + if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); - terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } taosArrayPush(taskOneLevel, pTask); - - SCoder encoder; - tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); - tEncodeSStreamTask(&encoder, pTask); - int32_t tlen = sizeof(SMsgHead) + encoder.pos; - tCoderClear(&encoder); - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - ((SMsgHead*)buf)->streamTaskId = pTask->taskId; - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER); - tEncodeSStreamTask(&encoder, pTask); - tCoderClear(&encoder); - - STransAction action = {0}; - action.epSet = plan->execNode.epSet; - action.pCont = buf; - action.contLen = tlen; - action.msgType = TDMT_VND_TASK_DEPLOY; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - rpcFreeCont(buf); - return -1; - } } + } else if (plan->subplanType == SUBPLAN_TYPE_SCAN) { // duplicatable @@ -113,88 +132,26 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // if has snode, set to shared thread num in snode parallel = SND_SHARED_THREAD_NUM; - for (int32_t i = 0; i < parallel; i++) { - SStreamTask* pTask = streamTaskNew(pStream->uid, level); - - // TODO:get snode id and ep - plan->execNode.nodeId = pVgroup->vgId; - plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup); - - if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) { - qDestroyQueryPlan(pPlan); - terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; - } - - taosArrayPush(taskOneLevel, pTask); - - SCoder encoder; - tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); - tEncodeSStreamTask(&encoder, pTask); - int32_t tlen = sizeof(SMsgHead) + encoder.pos; - tCoderClear(&encoder); - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - ((SMsgHead*)buf)->streamTaskId = pTask->taskId; - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER); - tEncodeSStreamTask(&encoder, pTask); - tCoderClear(&encoder); - - STransAction action = {0}; - action.epSet = plan->execNode.epSet; - action.pCont = buf; - action.contLen = tlen; - action.msgType = TDMT_SND_TASK_DEPLOY; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - rpcFreeCont(buf); - return -1; - } - } - } else { - // not duplicatable SStreamTask* pTask = streamTaskNew(pStream->uid, level); - + pTask->parallel = parallel; // TODO:get snode id and ep - plan->execNode.nodeId = pVgroup->vgId; - plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup); - - if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) { + if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); - terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } taosArrayPush(taskOneLevel, pTask); + } else { + // not duplicatable + SStreamTask* pTask = streamTaskNew(pStream->uid, level); - SCoder encoder; - tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); - tEncodeSStreamTask(&encoder, pTask); - int32_t tlen = sizeof(SMsgHead) + encoder.pos; - tCoderClear(&encoder); - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - ((SMsgHead*)buf)->streamTaskId = pTask->taskId; - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER); - tEncodeSStreamTask(&encoder, pTask); - tCoderClear(&encoder); - - STransAction action = {0}; - action.epSet = plan->execNode.epSet; - action.pCont = buf; - action.contLen = tlen; - action.msgType = TDMT_SND_TASK_DEPLOY; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - rpcFreeCont(buf); + // TODO: get snode + if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { + sdbRelease(pSdb, pVgroup); + qDestroyQueryPlan(pPlan); return -1; } + taosArrayPush(taskOneLevel, pTask); } taosArrayPush(pStream->tasks, taskOneLevel); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 45d45bef2d..e7cc12bb96 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -220,8 +220,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe streamObj.dbUid = pDb->uid; streamObj.version = 1; streamObj.sql = pCreate->sql; - streamObj.physicalPlan = pCreate->physicalPlan; - streamObj.logicalPlan = pCreate->logicalPlan; + streamObj.physicalPlan = ""; + streamObj.logicalPlan = ""; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); if (pTrans == NULL) { diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 6aee662675..f7d5226e77 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -433,12 +433,12 @@ ALLOC_VGROUP_OVER: return code; } -SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) { +SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) { SEpSet epset = {0}; for (int32_t v = 0; v < pVgroup->replica; ++v) { - SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; - SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + const SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); if (pDnode == NULL) continue; if (pVgid->role == TAOS_SYNC_STATE_LEADER) { diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 1e9e48e206..f4129e37ce 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -57,7 +57,7 @@ void sndMetaDelete(SStreamMeta *pMeta) { } int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) { - pTask->executor = qCreateStreamExecTaskInfo(pTask->qmsg, NULL); + pTask->runner.executor = qCreateStreamExecTaskInfo(pTask->qmsg, NULL); return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *)); } -- GitLab